You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

metric_exporter.lua 6.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. --[[
  2. Copyright (c) 2016, Andrew Lewis <nerf@judo.za.org>
  3. Copyright (c) 2016, Vsevolod Stakhov <vsevolod@highsecure.ru>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]] --
  14. if confighelp then
  15. return
  16. end
  17. local N = 'metric_exporter'
  18. local logger = require "rspamd_logger"
  19. local mempool = require "rspamd_mempool"
  20. local util = require "rspamd_util"
  21. local tcp = require "rspamd_tcp"
  22. local lua_util = require "lua_util"
  23. local pool
  24. local settings = {
  25. interval = 120,
  26. timeout = 15,
  27. statefile = string.format('%s/%s', rspamd_paths['DBDIR'], 'metric_exporter_last_push')
  28. }
  29. local VAR_NAME = 'metric_exporter_last_push'
  30. local valid_metrics = {
  31. 'actions.add header',
  32. 'actions.greylist',
  33. 'actions.no action',
  34. 'actions.reject',
  35. 'actions.rewrite subject',
  36. 'actions.soft reject',
  37. 'bytes_allocated',
  38. 'chunks_allocated',
  39. 'chunks_freed',
  40. 'chunks_oversized',
  41. 'connections',
  42. 'control_connections',
  43. 'ham_count',
  44. 'learned',
  45. 'pools_allocated',
  46. 'pools_freed',
  47. 'scanned',
  48. 'shared_chunks_allocated',
  49. 'spam_count',
  50. }
  51. local function validate_metrics(settings_metrics)
  52. if type(settings_metrics) ~= 'table' or #settings_metrics == 0 then
  53. logger.errx(rspamd_config, 'No metrics specified for collection')
  54. return false
  55. end
  56. for _, v in ipairs(settings_metrics) do
  57. local isvalid = false
  58. for _, vm in ipairs(valid_metrics) do
  59. if vm == v then
  60. isvalid = true
  61. break
  62. end
  63. end
  64. if not isvalid then
  65. logger.errx('Invalid metric: %s', v)
  66. return false
  67. end
  68. local split = rspamd_str_split(v, '.')
  69. if #split > 2 then
  70. logger.errx('Too many dots in metric name: %s', v)
  71. return false
  72. end
  73. end
  74. return true
  75. end
  76. local function load_defaults(defaults)
  77. for k, v in pairs(defaults) do
  78. if settings[k] == nil then
  79. settings[k] = v
  80. end
  81. end
  82. end
  83. local function graphite_config()
  84. load_defaults({
  85. host = 'localhost',
  86. port = 2003,
  87. metric_prefix = 'rspamd'
  88. })
  89. return validate_metrics(settings['metrics'])
  90. end
  91. local function graphite_push(kwargs)
  92. local stamp
  93. if kwargs['time'] then
  94. stamp = math.floor(kwargs['time'])
  95. else
  96. stamp = math.floor(util.get_time())
  97. end
  98. local metrics_str = {}
  99. for _, v in ipairs(settings['metrics']) do
  100. local mvalue
  101. local mname = string.format('%s.%s', settings['metric_prefix'], v:gsub(' ', '_'))
  102. local split = rspamd_str_split(v, '.')
  103. if #split == 1 then
  104. mvalue = kwargs['stats'][v]
  105. elseif #split == 2 then
  106. mvalue = kwargs['stats'][split[1]][split[2]]
  107. end
  108. table.insert(metrics_str, string.format('%s %s %s', mname, mvalue, stamp))
  109. end
  110. metrics_str = table.concat(metrics_str, '\n')
  111. tcp.request({
  112. ev_base = kwargs['ev_base'],
  113. config = rspamd_config,
  114. host = settings['host'],
  115. port = settings['port'],
  116. timeout = settings['timeout'],
  117. read = false,
  118. data = {
  119. metrics_str, '\n',
  120. },
  121. callback = (function (err)
  122. if err then
  123. logger.errx('Push failed: %1', err)
  124. return
  125. end
  126. pool:set_variable(VAR_NAME, stamp)
  127. end)
  128. })
  129. end
  130. local backends = {
  131. graphite = {
  132. configure = graphite_config,
  133. push = graphite_push,
  134. },
  135. }
  136. local function configure_metric_exporter()
  137. local opts = rspamd_config:get_all_opt(N)
  138. local be = opts['backend']
  139. if not be then
  140. logger.debugm(N, rspamd_config, 'Backend is unspecified')
  141. return
  142. end
  143. if not backends[be] then
  144. logger.errx(rspamd_config, 'Backend is invalid: ' .. be)
  145. return false
  146. end
  147. for k, v in pairs(opts) do
  148. settings[k] = v
  149. end
  150. return backends[be]['configure']()
  151. end
  152. if not configure_metric_exporter() then
  153. lua_util.disable_module(N, "config")
  154. return
  155. end
  156. rspamd_config:add_on_load(function (_, ev_base, worker)
  157. -- Exit unless we're the first 'controller' worker
  158. if not worker:is_primary_controller() then return end
  159. -- Persist mempool variable to statefile on shutdown
  160. pool = mempool.create()
  161. rspamd_config:register_finish_script(function ()
  162. local stamp = pool:get_variable(VAR_NAME, 'double')
  163. if not stamp then
  164. logger.warn('No last metric exporter push to persist to disk')
  165. return
  166. end
  167. local f, err = io.open(settings['statefile'], 'w')
  168. if err then
  169. logger.errx('Unable to write statefile to disk: %s', err)
  170. return
  171. end
  172. if f then
  173. f:write(pool:get_variable(VAR_NAME, 'double'))
  174. f:close()
  175. end
  176. pool:destroy()
  177. end)
  178. -- Push metrics to backend
  179. local function push_metrics(time)
  180. logger.infox('Pushing metrics to %s backend', settings['backend'])
  181. local args = {
  182. ev_base = ev_base,
  183. stats = worker:get_stat(),
  184. }
  185. if time then
  186. table.insert(args, time)
  187. end
  188. backends[settings['backend']]['push'](args)
  189. end
  190. -- Push metrics at regular intervals
  191. local function schedule_regular_push()
  192. rspamd_config:add_periodic(ev_base, settings['interval'], function ()
  193. push_metrics()
  194. return true
  195. end)
  196. end
  197. -- Push metrics to backend and reschedule check
  198. local function schedule_intermediate_push(when)
  199. rspamd_config:add_periodic(ev_base, when, function ()
  200. push_metrics()
  201. schedule_regular_push()
  202. return false
  203. end)
  204. end
  205. -- Try read statefile on startup
  206. local stamp
  207. local f, err = io.open(settings['statefile'], 'r')
  208. if err then
  209. logger.errx('Failed to open statefile: %s', err)
  210. end
  211. if f then
  212. io.input(f)
  213. stamp = tonumber(io.read())
  214. pool:set_variable(VAR_NAME, stamp)
  215. end
  216. if not stamp then
  217. logger.debugm(N, rspamd_config, 'No state found - pushing stats immediately')
  218. push_metrics()
  219. schedule_regular_push()
  220. return
  221. end
  222. local time = util.get_time()
  223. local delta = stamp - time + settings['interval']
  224. if delta <= 0 then
  225. logger.debugm(N, rspamd_config, 'Last push is too old - pushing stats immediately')
  226. push_metrics(time)
  227. schedule_regular_push()
  228. return
  229. end
  230. logger.debugm(N, rspamd_config, 'Scheduling next push in %s seconds', delta)
  231. schedule_intermediate_push(delta)
  232. end)