1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import grp
import os
import os.path
import psutil
import pwd
import shutil
import signal
import socket
import string
import sys
import tempfile
import time
if sys.version_info > (3,):
long = int
try:
from urllib.request import urlopen
except:
from urllib2 import urlopen
def cleanup_temporary_directory(directory):
shutil.rmtree(directory)
def encode_filename(filename):
return "".join(['%%%0X' % ord(b) for b in filename])
def get_process_children(pid):
children = []
for p in psutil.process_iter():
# ppid could be int or function depending on library version
if callable(p.ppid):
ppid = p.ppid()
else:
ppid = p.ppid
if ppid == pid:
children.append(p.pid)
return children
def get_test_directory():
return os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "../..")
def make_temporary_directory():
return tempfile.mkdtemp()
def read_log_from_position(filename, offset):
offset = long(offset)
f = open(filename, 'rb')
f.seek(offset)
goo = f.read()
size = len(goo)
return [goo, size+offset]
def rspamc(addr, port, filename):
mboxgoo = b"From MAILER-DAEMON Fri May 13 19:17:40 2016\r\n"
goo = open(filename, 'rb').read()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((addr, port))
s.send(b"CHECK RSPAMC/1.0\r\nContent-length: ")
s.send(str(len(goo+mboxgoo)).encode('utf-8'))
s.send(b"\r\n\r\n")
s.send(mboxgoo)
s.send(goo)
r = s.recv(2048)
return r.decode('utf-8')
def scan_file(addr, port, filename):
return str(urlopen("http://%s:%s/symbols?file=%s" % (addr, port, filename)).read())
def Send_SIGUSR1(pid):
pid = int(pid)
os.kill(pid, signal.SIGUSR1)
def set_directory_ownership(path, username, groupname):
uid=pwd.getpwnam(username).pw_uid
gid=grp.getgrnam(groupname).gr_gid
os.chown(path, uid, gid)
def spamc(addr, port, filename):
goo = open(filename, 'rb').read()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((addr, port))
s.send(b"SYMBOLS SPAMC/1.0\r\nContent-length: ")
s.send(str(len(goo)).encode('utf-8'))
s.send(b"\r\n\r\n")
s.send(goo)
s.shutdown(socket.SHUT_WR)
r = s.recv(2048)
return r.decode('utf-8')
def update_dictionary(a, b):
a.update(b)
return a
def shutdown_process(pid):
i = 0
while i < 100:
try:
os.kill(pid, signal.SIGTERM)
except OSError as e:
assert e.errno == 3
return
i += 1
time.sleep(0.1)
while i < 200:
try:
os.kill(pid, signal.SIGKILL)
except OSError as e:
assert e.errno == 3
return
i += 1
time.sleep(0.1)
assert False, "Failed to shutdown process %s" % pid
def shutdown_process_with_children(pid):
pid = int(pid)
children = get_process_children(pid)
shutdown_process(pid)
for child in children:
shutdown_process(child)
|