|
|
@@ -79,12 +79,30 @@ local function elastic_send_data(task) |
|
|
|
|
|
|
|
local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk' |
|
|
|
local bulk_json = table.concat(tbl, "\n") |
|
|
|
local err, response = rspamd_http.request({ |
|
|
|
|
|
|
|
local function http_callback(err, code, _, _) |
|
|
|
if err then |
|
|
|
rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s", |
|
|
|
push_url, err, failed_sends, settings.max_fail) |
|
|
|
else |
|
|
|
if code ~= 200 then |
|
|
|
rspamd_logger.infox(task, |
|
|
|
"cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s", |
|
|
|
push_url, err, code, failed_sends, settings.max_fail) |
|
|
|
else |
|
|
|
lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES", |
|
|
|
nrows, #bulk_json) |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
rspamd_http.request({ |
|
|
|
url = push_url, |
|
|
|
headers = { |
|
|
|
['Content-Type'] = 'application/x-ndjson', |
|
|
|
}, |
|
|
|
body = bulk_json, |
|
|
|
callback = http_callback, |
|
|
|
task = task, |
|
|
|
method = 'post', |
|
|
|
gzip = settings.use_gzip, |
|
|
@@ -93,24 +111,6 @@ local function elastic_send_data(task) |
|
|
|
password = settings.password, |
|
|
|
timeout = settings.timeout, |
|
|
|
}) |
|
|
|
|
|
|
|
if err then |
|
|
|
rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s", |
|
|
|
push_url, err, failed_sends, settings.max_fail) |
|
|
|
else |
|
|
|
if response.code ~= 200 then |
|
|
|
rspamd_logger.infox(task, |
|
|
|
"cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s", |
|
|
|
push_url, err, response.code, failed_sends, settings.max_fail) |
|
|
|
else |
|
|
|
lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES", |
|
|
|
nrows, #bulk_json) |
|
|
|
|
|
|
|
return true |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
return false |
|
|
|
end |
|
|
|
|
|
|
|
local function get_general_metadata(task) |