]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Pass upstream when sending TCP requests
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 2 Jul 2022 13:53:24 +0000 (14:53 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 2 Jul 2022 13:53:24 +0000 (14:53 +0100)
12 files changed:
lualib/lua_scanners/avast.lua
lualib/lua_scanners/clamav.lua
lualib/lua_scanners/dcc.lua
lualib/lua_scanners/fprot.lua
lualib/lua_scanners/icap.lua
lualib/lua_scanners/kaspersky_av.lua
lualib/lua_scanners/oletools.lua
lualib/lua_scanners/pyzor.lua
lualib/lua_scanners/razor.lua
lualib/lua_scanners/savapi.lua
lualib/lua_scanners/sophos.lua
lualib/lua_scanners/spamassassin.lua

index 6ecf6756fc4431d1bd6835a2feff21d5f88c7d9d..5c54f8c40fd1b9780ddac983413f4841c04a2a48 100644 (file)
@@ -93,6 +93,7 @@ local function avast_check(task, content, digest, rule, maybe_part)
       stop_pattern = CRLF,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule.timeout,
       task = task
     }
@@ -155,6 +156,7 @@ local function avast_check(task, content, digest, rule, maybe_part)
 
       upstream = rule.upstreams:get_upstream_round_robin()
       addr = upstream:get_addr()
+      tcp_opts.upstream = upstream
       tcp_opts.callback = avast_helo_cb
 
       local is_succ, err = tcp.request(tcp_opts)
@@ -224,7 +226,6 @@ local function avast_check(task, content, digest, rule, maybe_part)
             end
           elseif beg == '200' then
             -- Final line
-            upstream:ok()
             if tcp_conn then
               tcp_conn:close()
               tcp_conn = nil
index 48aaa66c4a6e581a75fde0bea56570bc9bcd780e..f984864e78b8af1e684791d4f964d7dbf13f3ceb 100644 (file)
@@ -91,9 +91,6 @@ local function clamav_check(task, content, digest, rule, maybe_part)
     local function clamav_callback(err, data)
       if err then
 
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
-
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
 
@@ -110,6 +107,7 @@ local function clamav_check(task, content, digest, rule, maybe_part)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule['timeout'],
             callback = clamav_callback,
             data = { header, content, footer },
@@ -123,7 +121,6 @@ local function clamav_check(task, content, digest, rule, maybe_part)
         end
 
       else
-        upstream:ok()
         data = tostring(data)
         local cached
         lua_util.debugm(rule.name, task, '%s: got reply: %s',
@@ -172,6 +169,7 @@ local function clamav_check(task, content, digest, rule, maybe_part)
       port = addr:get_port(),
       timeout = rule['timeout'],
       callback = clamav_callback,
+      upstream = upstream,
       data = { header, content, footer },
       stop_pattern = '\0'
     })
index 9c46194792b7c781db7b80766b94667a80fc7ae5..5cf0ef4a631d0f9030fec0171e6a3423066af36c 100644 (file)
@@ -138,9 +138,6 @@ local function dcc_check(task, content, digest, rule)
     local function dcc_callback(err, data, conn)
 
       local function dcc_requery()
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
-
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
 
@@ -158,6 +155,7 @@ local function dcc_check(task, content, digest, rule)
             host = addr:to_string(),
             port = addr:get_port(),
             timeout = rule.timeout or 2.0,
+            upstream = upstream,
             shutdown = true,
             data = request_data,
             callback = dcc_callback,
@@ -178,7 +176,6 @@ local function dcc_check(task, content, digest, rule)
 
       else
         -- Parse the response
-        if upstream then upstream:ok() end
         local _,_,result,disposition,header = tostring(data):find("(.-)\n(.-)\n(.-)$")
         lua_util.debugm(rule.name, task, 'DCC result=%1 disposition=%2 header="%3"',
             result, disposition, header)
@@ -287,6 +284,7 @@ local function dcc_check(task, content, digest, rule)
       port = addr:get_port(),
       timeout = rule.timeout or 2.0,
       shutdown = true,
+      upstream = upstream,
       data = request_data,
       callback = dcc_callback,
       body_max = 999999,
index 35c4c943ddf1ab08ae784063f1046cf3c31ae47e..d2153f7acd15addb5e4985289384f350a7745795 100644 (file)
@@ -91,9 +91,6 @@ local function fprot_check(task, content, digest, rule, maybe_part)
 
     local function fprot_callback(err, data)
       if err then
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
-
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
 
@@ -110,6 +107,7 @@ local function fprot_check(task, content, digest, rule, maybe_part)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule['timeout'],
             callback = fprot_callback,
             data = { header, content, footer },
@@ -155,6 +153,7 @@ local function fprot_check(task, content, digest, rule, maybe_part)
       task = task,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule['timeout'],
       callback = fprot_callback,
       data = { header, content, footer },
index 1d783e8344ec3d1809839c6381a3d791a370956b..ba571b13dbceb36831679d2e5d82832db2571502 100644 (file)
@@ -207,9 +207,6 @@ local function icap_check(task, content, digest, rule, maybe_part)
     local function icap_callback(err, conn)
 
       local function icap_requery(err_m, info)
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
-
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
 
@@ -230,6 +227,7 @@ local function icap_check(task, content, digest, rule, maybe_part)
           tcp_options.port = addr:get_port()
           tcp_options.callback = icap_callback
           tcp_options.data = options_request
+          tcp_options.upstream = upstream
 
           tcp.request(tcp_options)
 
@@ -649,8 +647,6 @@ local function icap_check(task, content, digest, rule, maybe_part)
       if err or conn == nil then
         icap_requery(err, "options_request")
       else
-        -- set upstream ok
-        if upstream then upstream:ok() end
         conn:add_read(icap_r_options_cb, '\r\n\r\n')
       end
     end
@@ -671,6 +667,7 @@ local function icap_check(task, content, digest, rule, maybe_part)
 
     tcp_options.host = addr:to_string()
     tcp_options.port = addr:get_port()
+    tcp_options.upstream = upstream
 
     tcp.request(tcp_options)
   end
index fc395f363c74effdff79d9554e0d5d76857526d3..69aaad7e5b894d61a4229dda108bbf6e0f68b152 100644 (file)
@@ -110,8 +110,6 @@ local function kaspersky_check(task, content, digest, rule, maybe_part)
 
     local function kaspersky_callback(err, data)
       if err then
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
 
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
@@ -129,6 +127,7 @@ local function kaspersky_check(task, content, digest, rule, maybe_part)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule['timeout'],
             callback = kaspersky_callback,
             data = { clamav_compat_cmd },
@@ -144,7 +143,6 @@ local function kaspersky_check(task, content, digest, rule, maybe_part)
         end
 
       else
-        upstream:ok()
         data = tostring(data)
         local cached
         lua_util.debugm(rule.name, task,
@@ -174,6 +172,7 @@ local function kaspersky_check(task, content, digest, rule, maybe_part)
       task = task,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule['timeout'],
       callback = kaspersky_callback,
       data = { clamav_compat_cmd },
index 04cdfad3e50980f9c6073cabf4e7ab3530898dfa..8513699a00fd6b434a976cb59979cb6083c3924f 100644 (file)
@@ -98,8 +98,6 @@ local function oletools_check(task, content, digest, rule, maybe_part)
     local function oletools_callback(err, data, conn)
 
       local function oletools_requery(error)
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
 
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
@@ -117,6 +115,7 @@ local function oletools_check(task, content, digest, rule, maybe_part)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule.timeout,
             shutdown = true,
             data = { protocol, content },
@@ -136,9 +135,6 @@ local function oletools_check(task, content, digest, rule, maybe_part)
         oletools_requery(err)
 
       else
-        -- Parse the response
-        if upstream then upstream:ok() end
-
         json_response = json_response .. tostring(data)
 
              if not string.find(json_response, '\t\n\n\t') and #data == 8192 then
@@ -346,6 +342,7 @@ local function oletools_check(task, content, digest, rule, maybe_part)
       task = task,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule.timeout,
       shutdown = true,
       data = { protocol, content },
index ed0e3d16b070165a40d353e2c4613e7bfd172535..78250a3adbe80e2983e637b63b8dc28b3f3aac21 100644 (file)
@@ -85,9 +85,6 @@ local function pyzor_check(task, content, digest, rule)
 
       if err then
 
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
-
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
 
@@ -104,6 +101,7 @@ local function pyzor_check(task, content, digest, rule)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule['timeout'],
             shutdown = true,
             data = content,
@@ -116,8 +114,6 @@ local function pyzor_check(task, content, digest, rule)
               'failed to scan and retransmits exceed')
         end
       else
-        -- Parse the response
-        if upstream then upstream:ok() end
         -- pyzor output is unicode (\x09 -> tab, \0a -> newline)
         --   public.pyzor.org:24441  (200, 'OK')     21285091   206759
         --   server:port             Code  Diag      Count      WL-Count
@@ -187,6 +183,7 @@ local function pyzor_check(task, content, digest, rule)
       task = task,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule.timeout,
       shutdown = true,
       data = content,
index 62b1e06a1a8cb94f2c1a9cc9b57220d479acb064..7de4c84eb96d34ad0408bc5c4aedeea77803b152 100644 (file)
@@ -90,9 +90,6 @@ local function razor_check(task, content, digest, rule)
     local function razor_callback(err, data, conn)
 
       local function razor_requery()
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
-
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
 
@@ -112,6 +109,7 @@ local function razor_check(task, content, digest, rule)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule.timeout or 2.0,
             shutdown = true,
             data = content,
@@ -129,9 +127,6 @@ local function razor_check(task, content, digest, rule)
         razor_requery()
 
       else
-        -- Parse the response
-        if upstream then upstream:ok() end
-
         --[[
         @todo: Razorsocket currently only returns ham or spam. When the wrapper is fixed we should add dynamic scores here.
         Maybe check spamassassin implementation.
@@ -163,6 +158,7 @@ local function razor_check(task, content, digest, rule)
       task = task,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule.timeout or 2.0,
       shutdown = true,
       data = content,
index 2933aa7b4a4fb85cad98b33c246c931bcdaf75da..430009df3b305930c7bcdfabb187685590821ee0 100644 (file)
@@ -198,9 +198,6 @@ local function savapi_check(task, content, digest, rule)
     local function savapi_callback_init(err, data, conn)
       if err then
 
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
-
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
 
@@ -217,6 +214,7 @@ local function savapi_check(task, content, digest, rule)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule['timeout'],
             callback = savapi_callback_init,
             stop_pattern = {'\n'},
@@ -226,7 +224,6 @@ local function savapi_check(task, content, digest, rule)
           common.yield_result(task, rule, 'failed to scan and retransmits exceed', 0.0, 'fail')
         end
       else
-        upstream:ok()
         local result = tostring(data)
 
         -- 100 SAVAPI:4.0 greeting
@@ -240,6 +237,7 @@ local function savapi_check(task, content, digest, rule)
       task = task,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule['timeout'],
       callback = savapi_callback_init,
       stop_pattern = {'\n'},
index b82a71025e429036d44a0edc23676981dd4ec8df..d9b64f1a80e410e161d182a8191974dc04007a0d 100644 (file)
@@ -90,9 +90,6 @@ local function sophos_check(task, content, digest, rule, maybe_part)
     local function sophos_callback(err, data, conn)
 
       if err then
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
-
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
 
@@ -109,6 +106,7 @@ local function sophos_check(task, content, digest, rule, maybe_part)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule['timeout'],
             callback = sophos_callback,
             data = { protocol, streamsize, content, bye }
@@ -119,7 +117,6 @@ local function sophos_check(task, content, digest, rule, maybe_part)
               0.0, 'fail', maybe_part)
         end
       else
-        upstream:ok()
         data = tostring(data)
         lua_util.debugm(rule.name, task,
             '%s [%s]: got reply: %s', rule['symbol'], rule['type'], data)
@@ -170,6 +167,7 @@ local function sophos_check(task, content, digest, rule, maybe_part)
       task = task,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule['timeout'],
       callback = sophos_callback,
       data = { protocol, streamsize, content, bye }
index c50c477a487e3da56e0e3fa27f529f29a8be6dbe..c4fd0d2e5af713e9943cd1e0df5bc98b2d2f93b4 100644 (file)
@@ -103,8 +103,6 @@ local function spamassassin_check(task, content, digest, rule)
     local function spamassassin_callback(err, data)
 
       local function spamassassin_requery(error)
-        -- set current upstream to fail because an error occurred
-        upstream:fail()
 
         -- retry with another upstream until retransmits exceeds
         if retransmits > 0 then
@@ -125,6 +123,7 @@ local function spamassassin_check(task, content, digest, rule)
             task = task,
             host = addr:to_string(),
             port = addr:get_port(),
+            upstream = upstream,
             timeout = rule['timeout'],
             data = request_data,
             callback = spamassassin_callback,
@@ -141,9 +140,6 @@ local function spamassassin_check(task, content, digest, rule)
         spamassassin_requery(err)
 
       else
-        -- Parse the response
-        if upstream then upstream:ok() end
-
         --lua_util.debugm(rule.N, task, '%s: returned result: %s', rule.log_prefix, data)
 
         --[[
@@ -193,6 +189,7 @@ local function spamassassin_check(task, content, digest, rule)
       task = task,
       host = addr:to_string(),
       port = addr:get_port(),
+      upstream = upstream,
       timeout = rule['timeout'],
       data = request_data,
       callback = spamassassin_callback,