Moved code for pid writing and terminating into a separated module.
Also added cleanup: in some cases, processes remained in the system after test is done. It should not happen anymore
Run Dummy Clam ${PORT_CLAM}
${result} = Scan Message With Rspamc ${MESSAGE}
Check Rspamc ${result} CLAM_VIRUS inverse=1
+ Shutdown clamav
CLAMAV HIT
Run Dummy Clam ${PORT_CLAM} 1
${result} = Scan Message With Rspamc ${MESSAGE2}
Check Rspamc ${result} CLAM_VIRUS (1.00)[Eicar-Test-Signature]
Should Not Contain ${result.stdout} CLAMAV_FAIL
+ Shutdown clamav
CLAMAV CACHE HIT
${result} = Scan Message With Rspamc ${MESSAGE2}
${result} = Scan Message With Rspamc ${MESSAGE2}
Check Rspamc ${result} FPROT_VIRUS inverse=1
Should Not Contain ${result.stdout} FPROT_EICAR
+ Shutdown fport
FPROT HIT - PATTERN
Run Dummy Fprot ${PORT_FPROT} 1
Should Contain ${result.stdout} FPROT_VIRUS_DUPLICATE_PATTERN
Should Not Contain ${result.stdout} FPROT_VIRUS_DUPLICATE_DEFAULT
Should Not Contain ${result.stdout} FPROT_VIRUS_DUPLICATE_NOPE
+ Shutdown fport
+ Shutdown fport duplicate
FPROT CACHE HIT
${result} = Scan Message With Rspamc ${MESSAGE}
Antivirus Teardown
Normal Teardown
Shutdown Process With Children ${REDIS_PID}
+ Shutdown clamav
+ Shutdown fport
+ Terminate All Processes kill=True
+
+Shutdown clamav
+ ${clamav_pid} = Get File if exists /tmp/dummy_clamav.pid
+ Run Keyword if ${clamav_pid} Shutdown Process With Children ${clamav_pid}
+
+Shutdown fport
+ ${fport_pid} = Get File if exists /tmp/dummy_fprot.pid
+ Run Keyword if ${fport_pid} Shutdown Process With Children ${fport_pid}
+
+Shutdown fport duplicate
+ ${fport_pid} = Get File if exists /tmp/dummy_fprot_dupe.pid
+ Run Keyword if ${fport_pid} Shutdown Process With Children ${fport_pid}
Run Dummy Clam
[Arguments] ${port} ${found}=
Run Dummy Fprot
[Arguments] ${port} ${found}= ${pid}=/tmp/dummy_fprot.pid
- ${result} = Start Process ${TESTDIR}/util/dummy_fprot.py ${port} ${found} ${pid}
+ Start Process ${TESTDIR}/util/dummy_fprot.py ${port} ${found} ${pid}
Wait Until Created ${pid}
except:
pass
+def get_file_if_exists(file_path):
+ if os.path.exists(file_path):
+ with open(file_path, 'r') as myfile:
+ return myfile.read()
+ return None
import os
import sys
+import socket
+import dummy_killer
try:
import SocketServer as socketserver
except:
import socketserver
-import signal
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
- os.remove(PID)
self.data = self.request.recv(1024).strip()
if self.server.foundvirus:
self.request.sendall(b"stream: Eicar-Test-Signature FOUND\0")
self.request.close()
if __name__ == "__main__":
- pid = os.fork()
- if pid > 0:
- sys.exit(0)
- signal.alarm(5)
-
HOST = "localhost"
alen = len(sys.argv)
server.foundvirus = foundvirus
server.server_bind()
server.server_activate()
- open(PID, 'w').close()
- server.handle_request()
+
+ dummy_killer.setup_killer(server)
+ dummy_killer.write_pid(PID)
+
+ try:
+ server.handle_request()
+ except socket.error:
+ print "Socket closed"
+
server.server_close()
- os.exit(0)
import os
import sys
import signal
+import socket
+import dummy_killer
try:
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
- os.remove(PID)
self.data = self.request.recv(1024).strip()
if self.server.foundvirus:
self.request.sendall(b"1 <infected: EICAR_Test_File> FOO->bar\n")
self.request.close()
if __name__ == "__main__":
- pid = os.fork()
- if pid > 0:
- sys.exit(0)
- signal.alarm(5)
HOST = "localhost"
server.foundvirus = foundvirus
server.server_bind()
server.server_activate()
- open(PID, 'w').close()
- server.handle_request()
+
+ dummy_killer.setup_killer(server)
+ dummy_killer.write_pid(PID)
+
+ try:
+ server.handle_request()
+ except socket.error:
+ print "Socket closed"
server.server_close()
- os.exit(0)
import BaseHTTPServer
import SocketServer
import SimpleHTTPServer
+import dummy_killer
import time
import os
import sys
-import signal
import socket
PORT = 18080
self.timeout = 1
def run(self):
- with open(PID, 'w+') as f:
- f.write(str(os.getpid()))
- f.close()
+ dummy_killer.write_pid(PID)
try:
while 1:
sys.stdout.flush()
print "Interrupt"
except socket.error:
print "Socket closed"
- pass
def stop(self):
self.keep_running = False
if __name__ == '__main__':
server = ThreadingSimpleServer()
- def alarm_handler(signum, frame):
- server.stop()
-
- signal.signal(signal.SIGALRM, alarm_handler)
- signal.signal(signal.SIGTERM, alarm_handler)
- signal.alarm(1000)
+ dummy_killer.setup_killer(server, server.stop)
server.run()
--- /dev/null
+import signal
+import os
+import atexit
+
+def setup_killer(server, method = None):
+ def default_method():
+ server.server_close()
+
+ if method is None:
+ method = default_method
+
+ def alarm_handler(signum, frame):
+ method()
+
+ signal.signal(signal.SIGALRM, alarm_handler)
+ signal.signal(signal.SIGTERM, alarm_handler)
+ signal.alarm(10)
+
+
+def write_pid(path):
+ with open(path, 'w+') as f:
+ f.write(str(os.getpid()))
+ f.close()
+
+ def cleanup():
+ os.remove(path)
+
+ atexit.register(cleanup)