+# Copyright 2024 Vsevolod Stakhov
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from urllib.request import urlopen
import glob
import grp
from robot.libraries.BuiltIn import BuiltIn
import demjson
+
def Check_JSON(j):
d = demjson.decode(j, strict=True)
logger.debug('got json %s' % d)
assert 'error' not in d
return d
+
def check_json_log(fn):
line_count = 0
f = open(fn, 'r')
line_count = line_count + 1
assert line_count > 0
+
def cleanup_temporary_directory(directory):
shutil.rmtree(directory)
+
def save_run_results(directory, filenames):
current_directory = os.getcwd()
suite_name = BuiltIn().get_variable_value("${SUITE_NAME}")
shutil.copy(source_file, "%s/%s" % (destination_directory, file))
shutil.copy(source_file, "%s/robot-save/%s.last" % (current_directory, file))
+
def encode_filename(filename):
return "".join(['%%%0X' % ord(b) for b in filename])
+
def get_test_directory():
return os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "../../")
+
def get_top_dir():
if os.environ.get('RSPAMD_TOPDIR'):
return os.environ['RSPAMD_TOPDIR']
return get_test_directory() + "/../../"
+
def get_install_root():
if os.environ.get('RSPAMD_INSTALLROOT'):
return os.path.abspath(os.environ['RSPAMD_INSTALLROOT'])
return os.path.abspath("../install/")
+
def get_rspamd():
if os.environ.get('RSPAMD'):
return os.environ['RSPAMD']
dname = get_top_dir()
return dname + "/src/rspamd"
+
def get_rspamc():
if os.environ.get('RSPAMC'):
return os.environ['RSPAMC']
dname = get_top_dir()
return dname + "/src/client/rspamc"
+
def get_rspamadm():
if os.environ.get('RSPAMADM'):
return os.environ['RSPAMADM']
dname = get_top_dir()
return dname + "/src/rspamadm/rspamadm"
+
def HTTP(method, host, port, path, data=None, headers={}):
c = http.client.HTTPConnection("%s:%s" % (host, port))
c.request(method, path, data, headers)
c.close()
return [s, t]
+
def hard_link(src, dst):
os.link(src, dst)
+
def make_temporary_directory():
"""Creates and returns a unique temporary directory
stat.S_IXOTH)
return dirname
+
def make_temporary_file():
return tempfile.mktemp()
+
def path_splitter(path):
dirname = os.path.dirname(path)
basename = os.path.basename(path)
return [dirname, basename]
+
def rspamc(addr, port, filename):
mboxgoo = b"From MAILER-DAEMON Fri May 13 19:17:40 2016\r\n"
goo = open(filename, 'rb').read()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((addr, port))
s.send(b"CHECK RSPAMC/1.0\r\nContent-length: ")
- s.send(str(len(goo+mboxgoo)).encode('utf-8'))
+ s.send(str(len(goo + mboxgoo)).encode('utf-8'))
s.send(b"\r\n\r\n")
s.send(mboxgoo)
s.send(goo)
r = s.recv(2048)
return r.decode('utf-8')
+
def Scan_File(filename, **headers):
addr = BuiltIn().get_variable_value("${RSPAMD_LOCAL_ADDR}")
port = BuiltIn().get_variable_value("${RSPAMD_PORT_NORMAL}")
BuiltIn().set_test_variable("${SCAN_RESULT}", d)
return
+
def Send_SIGUSR1(pid):
pid = int(pid)
os.kill(pid, signal.SIGUSR1)
+
def set_directory_ownership(path, username, groupname):
if os.getuid() == 0:
- uid=pwd.getpwnam(username).pw_uid
- gid=grp.getgrnam(groupname).gr_gid
+ uid = pwd.getpwnam(username).pw_uid
+ gid = grp.getgrnam(groupname).gr_gid
os.chown(path, uid, gid)
+
def spamc(addr, port, filename):
goo = open(filename, 'rb').read()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
r = s.recv(2048)
return r.decode('utf-8')
+
def TCP_Connect(addr, port):
"""Attempts to open a TCP connection to specified address:port
| Wait Until Keyword Succeeds | 5s | 10ms | TCP Connect | localhost | 8080 |
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(5) # seconds
+ s.settimeout(5) # seconds
s.connect((addr, port))
s.close()
+
def ping_rspamd(addr, port):
return str(urlopen("http://%s:%s/ping" % (addr, port)).read())
+
def redis_check(addr, port):
"""Attempts to open a TCP connection to specified address:port
| Wait Until Keyword Succeeds | 5s | 10ms | TCP Connect | localhost | 8080 |
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(1.0) # seconds
+ s.settimeout(1.0) # seconds
s.connect((addr, port))
if s.sendall(b"ECHO TEST\n"):
result = s.recv(128)
else:
return False
+
def update_dictionary(a, b):
a.update(b)
return a
TERM_TIMEOUT = 10 # wait after sending a SIGTERM signal
KILL_WAIT = 20 # additional wait after sending a SIGKILL signal
+
def shutdown_process(process):
# send SIGTERM
process.terminate()
process.wait(TERM_TIMEOUT)
return
except psutil.TimeoutExpired:
- logger.info( "PID {} is not terminated in {} seconds, sending SIGKILL...".format(process.pid, TERM_TIMEOUT))
+ logger.info("PID {} is not terminated in {} seconds, sending SIGKILL...".format(process.pid, TERM_TIMEOUT))
try:
# send SIGKILL
process.kill()
pass
psutil.wait_procs(children, timeout=KILL_WAIT)
+
def write_to_stdin(process_handle, text):
if not isinstance(text, bytes):
text = bytes(text, 'utf-8')
out = obj.stdout.read(4096)
return out.decode('utf-8')
+
def get_file_if_exists(file_path):
if os.path.exists(file_path):
with open(file_path, 'r') as myfile:
return myfile.read()
return None
+
def _merge_luacov_stats(statsfile, coverage):
"""
Reads a coverage stats file written by luacov and merges coverage data to
| Collect Lua Coverage |
"""
# decided not to do optional coverage so far
- #if not 'ENABLE_LUA_COVERAGE' in os.environ['HOME']:
+ # if not 'ENABLE_LUA_COVERAGE' in os.environ['HOME']:
# logger.info("ENABLE_LUA_COVERAGE is not present in env, will not collect Lua coverage")
# return