CVE-2025-20393

Catalpa 网络安全爱好者

2025 年 12 月 10 日,Cisco 发布了关于 Cisco Secure Email Gateway 和 Cisco Secure Email and Web Manager 的未授权远程命令执行漏洞预警公告,此漏洞影响这些系统的 Spam Quarantine 功能。本次我们根据有关信息对该漏洞进行复现分析。

基本信息

Cisco 发布的公告提到该漏洞仅影响系统中的 Spam Quarantine web 服务,这是邮件安全网关中用于拦截与展示垃圾邮件的功能,漏洞 CWE 为 CWE-20 属于输入验证不当,除此之外公告中没有给出更加具体的信息。

该漏洞公告发布于 2025 年 12 月 10 日,补丁于 2026 年 1 月 15 日更新,并且该漏洞在修复之前就已存在在野利用。

漏洞分析

Cisco 在更新 Cisco Secure Email Gateway 和 Cisco Secure Email and Web Manager 补丁的同时将所有旧版本系统做下架处理。通过其它渠道我只能获取到 Cisco Secure Email Gateway 的 16.0.1.017 和 16.0.4.016 两个版本,因此暂时只能使用这两个版本进行补丁对比。你可以在这里获取环境:下载链接

环境部署

将两个版本的镜像导入虚拟机启动,提示登录时输入默认账户 admin/ironport,执行 ifconfig 命令可以查看网卡信息,访问 IP:443 进入 Web 管理页面,使用 admin 账户登录后会提示完成一些基础配置。

系统不直接提供 root shell 所以要手动获取权限,完成基本配置之后关机,将硬盘挂载到另一个 FreeBSD 系统上,分析发现关键文件都位于 /dev/da1p9 分区中。

通过搜索登录 CLI 后输出的一些信息,定位到 CLI 程序 bin/cli.sh,我们直接修改这个脚本文件,在头部添加命令 /bin/sh -i,并且修改系统中 python 的权限为 4777。

保存修改开机,使用 admin 账户登录即可进入普通权限的 linux shell,再执行 python 并 setuid(0) 获得 root 权限。业务逻辑相关的代码都位于 /data/release/current 目录下,分别将两个版本的 current 目录下载到本地。

Spam Quarantine 功能需要在系统后台手动开启,在 Monitor -> Spam Quarantine 选项下面打开垃圾邮件隔离功能,然后在 Network -> IP Interfaces 选项下面修改 Management 网卡配置,启用 Spam Quarantine,该服务默认监听 83 端口。

代码解密

查看端口信息看到是 euq_webui 监听 83 端口,简单逆向分析发现这实际上是一个使用 Python Frozen 打包的 ELF 程序,通过字符串定位到使用的 python 版本为 2.6.13。

打包在 ELF 程序中的模块位于 _PyImport_FrozenModules 数组中,它的结构为:

1
2
3
4
5
struct _frozen {
const char *name;
const unsigned char *code;
int size;
};

code 是该模块对应的 marshal 序列化代码对象,注意这部分不是完整的 pyc 文件格式。

我们编写 IDAPython 脚本(dump_code.py)将嵌入的模块代码分离出来,同时补充 pyc magic 和文件日期等 header 数据。

得到嵌入模块的 pyc 代码之后,可以使用 uncompyle6 反编译,主要关注 __main__.pyc 文件,反编译后得到代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# uncompyle6 version 3.9.2
# Python bytecode version base 2.6 (62161)
# Decompiled from: Python 3.12.3 (main, Nov 6 2025, 13:44:16) [GCC 13.3.0]
# Embedded file name: /usr/build/iproot/ap/ipoe/ipoe/bootstrap.py
# Compiled at: 2024-11-25 18:17:56
"""Bootstrap the execution of ipoes.

For information on how this is built with freeze.py check out
ap/ipoe/tools/build_frozen.py. There are a large number of excludes that are
important to operation.
"""
__version__ = '$Revision: #6 $'
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
import os, pkg_resources, runpy, sys, warnings, _codecs, _hashlib, sslip, bsddb185 as bsddb, zipimport, ipoe
if __name__ == '__main__':
warnings.filterwarnings('ignore')
ipoe.imputil.register_ipoe()
pkg_resources.working_set.entries = []
map(pkg_resources.working_set.add_entry, sys.path)
if len(sys.argv) > 1:
if sys.argv[1] == '--compat':
sys.argv = sys.argv[2:]
elif sys.argv[1] == '--bootstrap-testing':
sys.argv = sys.argv[2:]
__import__(sys.argv[0])
sys.exit(0)
runpy._run_module_as_main(os.path.basename(sys.argv[0]))

# okay decompiling ./__main__.pyc

main 函数中会调用 ipoe.imputil.register_ipoe 注册包加载器,这个函数定义在 ipoe.imputil.pyc 文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def register_ipoe():
"""Enable python to import ipoe packages.

Register the ipoe importer as an available import mechanism. Note that
since packages on sys.path might have already made it into the
path_importer_cache (and probably stored as None since there was no
importer at the time), we clear it to make sure we're allowed to handle
packages.
"""
sys.path_hooks.insert(0, IPOEImporter)
sys.path_importer_cache.clear()
pkg_resources.register_finder(IPOEImporter, find_in_ipoe)
pkg_resources.register_loader_type(IPOEImporter, IPOEProvider)
pkg_resources.register_namespace_handler(IPOEImporter, pkg_resources.file_ns_handler)
pkg_resources.register_finder(pkg_resources.ImpWrapper, find_on_path)

ipoe.imputil.pyc 实现了一个自定义的 Python 软件包加载器,正常的 Python 软件包是蛋(egg)格式,这里引入了 ipoe 格式。

ipoe 文件本身是 ZIP 格式,但是它内部包含的 python 代码文件都被加密了,在加载 ipoe 包的过程中,会调用 ipoe.encryption.pyc 中实现的算法进行解密:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# uncompyle6 version 3.9.2
# Python bytecode version base 2.6 (62161)
# Decompiled from: Python 3.12.3 (main, Nov 6 2025, 13:44:16) [GCC 13.3.0]
# Embedded file name: /usr/build/iproot/ap/ipoe/ipoe/encryption.py
# Compiled at: 2024-11-25 18:17:56
"""Import utilities to load ipoe packages."""
__version__ = '$Revision: #4 $'
import binascii, random, sslip2
ALGNAME = 'aes-128-cbc'
ALG_BLOCK_SIZE = 16
HASHALGNAME = 'sha1'
HASH_BLOCK_SIZE = 64
HASH_TAG_SIZE = 20
K_MAT = [
9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24,
25,
26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
40, 41, 42, 12, 43,
44, 45, 46, 47, 48, 21, 49, 50, 37, 51,
52, 53, 54, 55, 56, 44, 27, 57, 58,
59, 60, 61, 62, 63, 64,
30, 65, 66, 64, 67, 19, 68, 69, 70, 71, 72, 73, 74,
75, 9, 76,
77, 78, 79, 80, 81, 82, 83, 84, 85, 70, 19, 86, 18, 71, 87, 78,
45,
56, 47, 88, 89, 90, 91, 92, 20, 93, 27, 53, 94, 14, 25, 95, 96,
93, 95,
97, 98, 34, 99, 45, 46, 100, 14, 101, 102, 75, 103, 32,
104, 105, 41, 104, 106, 107,
108, 16, 109, 110, 111, 112, 35,
72, 113, 112, 110, 51, 22, 114, 107, 63, 115, 16, 64,
45, 19,
18, 80, 60, 116, 117, 10, 82, 78, 66, 118, 119, 45, 120, 121, 122,
91, 82,
69, 123, 124, 39, 125, 126, 84, 127, 128, 122, 129, 130,
131, 48, 39, 123, 52, 132, 80,
133, 100, 134, 135, 83, 120, 24,
93, 136, 121, 16, 87, 90, 99, 137, 95, 123, 54, 138,
135, 55,
139, 28, 39, 98, 140, 107, 141, 69, 142, 102, 143, 100, 124, 144,
20, 145, 146,
28, 103, 147, 26, 148, 149, 150, 151, 152, 153,
66, 154, 111, 155, 113, 64, 125, 156, 92,
60, 90, 118, 112, 97,
157, 158, 159, 86, 160, 31, 9, 161, 28, 162, 163, 103, 164, 74,
87,
146, 147, 82, 72, 165, 166, 167, 37, 148, 168, 169, 170, 113, 171,
172, 39, 153, 31,
173, 76, 174, 28, 29, 63, 174, 82, 70, 175,
176, 129, 34, 177, 167, 171, 48, 178, 103,
168, 40, 179, 180,
181, 135, 182, 121, 127, 145, 62, 75, 183, 184, 164, 39, 165, 142,
75,
100, 185, 186, 82, 49, 187, 145, 83, 45, 119, 164, 115, 188,
189, 190, 9, 144, 166, 191,
40, 29, 31, 192, 193, 28, 194, 195,
181, 193, 115, 196, 133, 181, 141, 128, 53, 14, 147,
56, 197,
85, 198, 95, 16, 116, 199, 79, 196, 140, 58, 121, 200, 103, 26,
201, 110, 146,
187, 52, 24, 199, 202, 161, 48, 71, 53, 73, 126,
99, 47, 203, 204, 205, 41, 97, 153,
206, 111, 75, 189, 62, 122,
37, 57, 59, 186, 200, 207, 208, 27, 123, 13, 156, 159, 205,
209,
80, 183, 201, 66, 210, 182, 173, 105, 151, 134, 211, 152, 212, 89,
93, 205, 213, 85,
214, 90, 81, 187, 12, 159, 118, 36, 12, 209,
117, 71, 214, 88, 123, 142, 163, 215, 216,
110, 135, 128, 177,
217, 105, 174, 32, 218, 128, 219, 186, 204, 107, 91, 130, 220, 221,
173,
201, 68, 108, 37, 66, 95, 31, 218, 49, 89, 100, 222, 183,
88, 154, 207, 91, 77, 223,
113, 183, 96, 98, 204, 147, 34, 76,
224, 69, 60, 225, 208, 104, 104, 56, 54, 19, 90,
41, 226, 109,
226, 220, 95, 64, 162, 193, 139, 106, 227]

class IPOEDecryptError(Exception):
pass


_PRE_KEY = None

def get_pre_key():
global K_MAT
global _PRE_KEY
if _PRE_KEY:
return _PRE_KEY
_PRE_KEY = binascii.unhexlify(('').join(K_MAT[10:42]))
return _PRE_KEY


_PRE_SALT = None

def get_pre_salt():
global _PRE_SALT
if _PRE_SALT:
return _PRE_SALT
_PRE_SALT = binascii.unhexlify(('').join(K_MAT[200:232]))
return _PRE_SALT


_PRE_CTX = None

def get_pre_ctx():
global _PRE_CTX
if _PRE_CTX:
return _PRE_CTX
_PRE_CTX = binascii.unhexlify(('').join(K_MAT[300:332]))
return _PRE_CTX


def pad(a_str, block_size):
pad_len = block_size - len(a_str) % block_size
return a_str + chr(pad_len) * pad_len


_GK1 = None
_GK2 = None

def get_keys():
global _GK1
global _GK2
if _GK1 and _GK2:
return (_GK1, _GK2)
prk = sslip2.get_hmac('sha256', get_pre_salt(), get_pre_key())
key1 = sslip2.get_hmac('sha256', prk, get_pre_ctx() + '\x01')
key2 = sslip2.get_hmac('sha256', prk, key1 + get_pre_ctx() + '\x02')
key1 = key1[:16]
key2 = key2[:20]
_GK1, _GK2 = key1, key2
return (key1, key2)


def get_iv():
return binascii.unhexlify('%032x' % random.getrandbits(128))


def encrypt_file(fobj):
"""Encrypt a file.

:Parameters:
- 'fobj' - File like object that the clear text can be read from.
:Returns:
- A string containing the file's encrypted data.
"""
clear_text = fobj.read()
if not clear_text:
return ''
(c_key, h_key) = get_keys()
iv = get_iv()
cipher = sslip2.cipher(ALGNAME, c_key, iv, encrypt=1, padding=0)
cipher_text = cipher.update(pad(clear_text, ALG_BLOCK_SIZE))
cipher_text = iv + cipher_text
dig_tag = sslip2.get_hmac('sha1', h_key, cipher_text)
return dig_tag + cipher_text


def decrypt_file(fobj):
"""Decrypt a file.

Given a file like object to read from, decrypt the contents. Note that
while this takes an open flie like object, it must be at the beginning of
the file or else decryption will not work (because AES is block based).

:Parameters:
- 'fobj' - File like object that the encrypted contents will be read
from.
:Returns:
- All data in file decrypted.
"""
tag = fobj.read(HASH_TAG_SIZE)
if not tag:
return ''
else:
(c_key, h_key) = get_keys()
c_text = fobj.read()
if sslip2.get_hmac('sha1', h_key, c_text) != tag:
raise IPOEDecryptError
cipher = sslip2.cipher(ALGNAME, c_key, c_text[:ALG_BLOCK_SIZE], encrypt=0, padding=0)
if fobj.name.endswith('.so'):
p_text = sslip2.de_pad(cipher.update(c_text[ALG_BLOCK_SIZE:]), ALG_BLOCK_SIZE)
extra_len = len(c_text) + HASH_TAG_SIZE - len(p_text)
return p_text + chr(0) * extra_len
return sslip2.de_pad(cipher.update(c_text[ALG_BLOCK_SIZE:]), ALG_BLOCK_SIZE)


def run_tests(burn=False, kprint=False, do_diff=False):
global _GK1
global _GK2
global _PRE_CTX
global _PRE_KEY
global _PRE_SALT
if burn:
_PRE_CTX = _PRE_SALT = _PRE_KEY = None
_GK1 = _GK2 = None
(key1, key2) = get_keys()
if kprint:
print 'key1:', binascii.hexlify(key1)
print 'key2:', binascii.hexlify(key2)
with open('/etc/hosts', 'r') as hosts_f:
crypted = encrypt_file(hosts_f)
with open('/etc/hosts', 'r') as hosts_f:
pre_lines = hosts_f.readlines()
with open('/tmp/ipoe_test', 'w') as test_f:
test_f.write(crypted)
with open('/tmp/ipoe_test', 'r') as crypt_f:
clear = decrypt_file(crypt_f)
if do_diff:
import difflib
pre_lines = [_[5] for line in pre_lines]
for line in difflib.context_diff(pre_lines, clear.splitlines()):
print line

return


def run_decr():
with open('/etc/hosts', 'r') as hosts_f:
crypted = encrypt_file(hosts_f)
with open('/etc/hosts', 'r') as hosts_f:
pre_lines = hosts_f.readlines()
with open('/tmp/ipoe_test', 'w') as test_f:
test_f.write(crypted)
for _ in range(10000):
with open('/tmp/ipoe_test', 'r') as crypt_f:
clear = decrypt_file(crypt_f)
del clear


if __name__ == '__main__':
run_tests(kprint=True, do_diff=True)
bu = binascii.unhexlify
k1 = bu('0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b')
d1 = 'Hi There'
k2 = 'key'
d2 = 'The quick brown fox jumps over the lazy dog'
test_mat = [
[
k1, d1, 'sha224', bu('896fb1128abbdf196832107cd49df33f47b4b1169912ba4f53684b22')],
[
k1, d1, 'sha256', bu('b0344c61d8db38535ca8afceaf0bf12b881dc200c9833da726e9376c2e32cff7')],
[
k1, d1, 'sha384',
bu('afd03944d84895626b0825f4ab46907f15f9dadbe4101ec682aa034c7cebc59c' + 'faea9ea9076ede7f4af152e8b2fa9cb6')],
[
k1, d1, 'sha512',
bu('87aa7cdea5ef619d4ff0b4241a1d6cb02379f4e2ce4ec2787ad0b30545e17cde' + 'daa833b7d6b8a702038b274eaea3f4e4be9d914eeb61f1702e696c203a126854')],
[
k2, d2, 'md5', bu('80070713463e7749b90c2dc24911e275')],
[
k2, d2, 'sha1', bu('de7c9b85b8b78aa6bc8a7a36f70a90701c9db4d9')],
[
k2, d2, 'sha256', bu('f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8')]]
for quad in test_mat:
if sslip2.get_hmac(quad[2], quad[0], quad[1]) != quad[3]:
print 'ERROR!', quad

if sslip2.de_pad('\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07', 10) != '\x07\x07\x07':
print 'ERROR! de_pad 1'
if sslip2.de_pad('\x02\x02\x02\x02\x02\x02\x02\x02\x02\x02', 10) != '\x02\x02\x02\x02\x02\x02\x02\x02':
print 'ERROR! de_pad 2'
try:
sslip2.de_pad('\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07', 4)
except sslip2.Error:
pass
else:
print 'ERROR! de_pad 3'

# okay decompiling ipoe.encryption.pyc

对加密文件会先使用 HMAC-SHA1 进行完整性检查,然后再用 AES 算法解密。Key 是按上述代码生成的,IV 从加密文件中读取。

参考以上代码在本地编写解密脚本(handle_ipoe.py),批量处理所有 ipoe 包之后,可以开始补丁对比。

补丁对比

我们使用 Beyond Compare 比较,主要比较 lib/python2.6_13_amd64_nothr/site-packages 目录,因为大部分 ipoe 包都在这个目录下。

比较时有一些需要特别注意的地方,由于 python 代码是反编译得到的,而 uncompyle6 在反编译时会向文件头添加源文件编译时间等信息,导致 Beyond Compare 将一些没有任何变化的文件识别为不同,另外还有很多文件只是版本字符串发生了变化。为了缩小比较范围,可以在 Beyond Compare 中设置例外规则,将这些变动设置为不重要的文本:

1
2
3
# Compiled at.*\s
//prod/phoebe-16-0.*\s
__CHEETAH_genTime__ =.*

这样对比范围就变小了很多,在 enduser_site_packages/aquarium/screen/AppController.py 找到了一处关键变动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 16.0.1.017
elif form.has_key('auth'):
if not self.specialLogin():
_reject_login_admin(_('An error occurred during authentication. Please contact your system\nadministrator or, if you are the System Administrator, view logs for details.'))



# 16.0.4.016
elif form.has_key('auth'):
if self._ctx.form.get('serial', None):
del form['serial']
try:
if self._ctx.form.get('auth') is not None:
ui_checker.is_md5Hash(self._ctx.form.get('auth', ''))
else:
raise
except ui_checker.FilterError, e:
coro.print_stderr('Error: %s' % str(e))
_reject_login_admin(_('An error occurred during authentication. Please contact your system\nadministrator or, if you are the System Administrator, view logs for details.'))
except Exception, e:
coro.print_stderr('Exception ocurred in validating auth code. Error: %s' % str(e))
_reject_login_admin(_('An error occurred during authentication. Please contact your system\nadministrator or, if you are the System Administrator, view logs for details.'))
else:
if not self.specialLogin():
_reject_login_admin(_('An error occurred during authentication. Please contact your system\nadministrator or, if you are the System Administrator, view logs for details.'))

新版本代码检测到 serial 参数会直接将它删除,而 auth 参数则需要通过 is_md5Hash 的验证:

1
2
3
4
5
6
7
8
9
def is_md5Hash(s):
if len(s) != 32:
raise FilterError('is_md5Hash', validate_md5_format, s[:80] if s else s)
try:
int(s, 16)
except ValueError:
raise FilterError('is_md5Hash', validate_md5_format, s[:80] if s else s)

return True

对参数新增检查说明这可能就是漏洞点所在,而且这段代码位于登录功能的入口,无需任何权限即可调用。具体分析逻辑,当请求中存在 auth 参数时进入这段代码,旧版本直接调用 specialLogin 函数,最终会调用到 CoroSessionContainer.py 中的 checkHash 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def specialLogin(self):
(sid, username) = self.specialAuthenticate()
if not (sid is None or username is None):
qlog.write('EUQ.GUI_AUTH_OK', username)
self._ctx.session.logout()
self._ctx.session.login(username, '', is_admin=True, force=True, params={'login': username})
return True
else:
return False

def specialAuthenticate(self):
return self._ctx.session.checkHash(self._ctx.form['auth'], self._ctx.form.get('serial', None))


def checkHash(self, hash, serial):
res = (None, None)
cc = currentContext().commandd
try:
res = cc.call(('gui.gui', ), 'check_hash', hash, serial)[0][1]
except:
qlog.write('COMMON.APP_FAILURE', str(coro.compact_traceback()))

return res

传入 checkHash 的 hash 和 serial 都是可控的,代码接着调用 commandd 的 call 函数,它定义在 command_client.py 中的 command_client 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def call(self, modules, method, args, destination=None, timeout=0, tag=None):
message_data = []
is_py3_module = False
if modules in PY3_modules:
is_py3_module = True
for module in modules:
message_data.append((module, (method, args)))

message = self.send_message(Commandment.MSG_CALL, message_data, destination=destination, timeout=timeout, tag=tag)
result = cPickle.loads(message)
for (modname, value) in result:
if isinstance(value, Exception):
raise value

if is_py3_module == True:
encoded_result = encoded_string.cvt_to_py2types(result)
return encoded_result
else:
return result

注意到这个函数的第四个参数是 destination,而代码中其它位置使用 cc.call 的调用方式为:

1
topin = self._cc.call(('hermes.imh', ), 'topin', (num_hosts,))[0][1]

checkHash 里面直接将 serial 作为 destination 参数使用,这可能会导致一些问题。

call 函数调用 send_message 函数,部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def send_message(self, commandment_msg, data='', source=None, destination=None, ttl=0, expected=AUTO_EXPECT, timeout=0, tag=None):
if not self.is_connected():
raise Commandment.NotConnectedError()
if source is None:
source = tags.serial()
if destination is None:
destination = self.destination
if type(data) != types.StringType:
data = cPickle.dumps(data, 1)
if tag is None:
_tag = self.tag
else:
_tag = tag
old_int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
signal2.sigprocmask(signal2.SIG_BLOCK, signal2.SigSet(signal.SIGWINCH))
try:
try:
CommandMessage.send_message(self._socket.send, commandment_msg, source, destination, data, ttl=ttl, timeout=timeout, tag=_tag)
# ...

return

代码又调用了 CommandMessage.send_message 函数,注意 data 参数已经是经过 cPickle 序列化之后的数据。

1
2
3
4
5
6
7
8
9
10
11
# CommandMessage.send_message
def send_message(write_method, message_type, source, destination='', message='', ttl=0, timeout=0, tag=None):
header = struct.pack('>BBIIBB32s', Commandment.MESSAGE_VERSION, ttl, len(message), message_type, len(source), len(destination), _message_tag(tag))
if timeout:
coro.with_timeout(timeout, write_method, header + source + destination)
for x in xrange(0, len(message), MAX_PACKET_SIZE):
coro.with_timeout(timeout, write_method, message[x:x + MAX_PACKET_SIZE])

else:
packet = header + source + destination + message
write_method(packet)

CommandMessage.send_message 函数使用 struct 模块打包 header 再拼接后面的数据,最后会调用 self._socket.send 将数据发送出去,考虑到 command_client 的初始化过程,数据会被发送给 /tmp/commandd.mainsock

这个函数存在一处问题,在打包 header 时对于 destination 的长度使用了 B,而 destination (即 serial)实际没有长度限制。如果传入一个长度超过 255 字节的 destination,struct.pack 会发生整数溢出,令 destination 的长度字段回绕成较小值。

CommandServer.py 监听 /tmp/commandd.mainsock,它会调用 read_message 函数读取输入的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
while 1:
if self._has_locks():
timeout = self.held_lock_timeout
else:
timeout = 0
try:
(txn_tag, ttl, message_type, source, destination, message) = CommandMessage.read_message(self.s.recv, timeout=timeout)
except coro.TimeoutError:
P(None, 'Naughty process taking too long while it holds a lock.\n', T_STDERR)
raise

if source in cluster.config.ecluster_serial_nos:
coro.print_stderr('received message from inter-cluster: function called %s \n' % str(CommandSession.messages[message_type]))
self.process_cluster_connection_updates(txn_tag)
if source and source not in cluster.config.ecluster_serial_nos and not self.ok_source_serial(source):
e = Commandment.UnknownSerialError(source, from_serial=source)
self.send_error(e, source, txn_tag)
elif destination and destination != my_serial_number:
if cluster.util.is_cluster_member():
self.forward_cluster_message(ttl, message_type, source, destination, message, txn_tag)
else:
e = Commandment.UnknownSerialError(source, to_serial=destination)
self.send_error(e, source, txn_tag)
else:
try:
fun = CommandSession.messages[message_type]
except KeyError:
e = Commandment.UnknownCommandError(message_type, from_serial=source)
self.send_error(e, source, txn_tag)
else:
fun(self, message, source, destination, txn_tag)

CommandMessage.read_message 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def read_message(read_method, timeout=0):
header = _read(read_method, Commandment.HEADER_LENGTH, timeout)
try:
(version, ttl, message_length, message_type, source_length, destination_length, txn_tag) = struct.unpack(Commandment.HEADER, header)
except struct.error:
raise Commandment.MessageFormatError()

if version != Commandment.MESSAGE_VERSION:
try:
more_data = _read(read_method, 10240, 3)
except:
more_data = ''
else:
coro.print_stderr('*************** Corrupt data reading message:\n' + header + more_data + '\n')
coro.print_stderr('header: ' + binascii.hexlify(header))
raise Commandment.MessageVersionError(version, Commandment.MESSAGE_VERSION)
source = _read(read_method, source_length, timeout)
if destination_length:
destination = _read(read_method, destination_length, timeout)
else:
destination = ''
message = _read(read_method, message_length, timeout)
return (txn_tag.rstrip('\x00'), ttl, message_type, source, destination, message)

首先解析 header 部分,获取到各个字段的长度,然后依次读取出这些字段。考虑 Command 协议的实现,一个正常的请求结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
+--------+--------+--------+--------+ -+
| VER | ttl | msg_len | |
+--------+--------+--------+--------+ |
| msg_len | msg_type | |
+--------+--------+--------+--------+ |
| msg_type | SRC_LEN| DST_LEN| | --> HEADER
+--------+--------+--------+--------+ |
| TAG (32 BYTES) | |
+--------+--------+--------+--------+ |
| ... | |
+--------+--------+--------+--------+ -+
| SOURCE (SRC_LEN BYTES) |
+--------+--------+--------+--------+
| ... |
+--------+--------+--------+--------+
| DEST (DST_LEN BYTES) | --> BODY
+--------+--------+--------+--------+
| ... |
+--------+--------+--------+--------+
| MESSAGE (msg_len BYTES) |
+--------+--------+--------+--------+
| ... |
+--------+--------+--------+--------+

由于 CommandMessage.send_message 函数在打包 header 时,没有限制 destination 的长度,而且 DST_LEN 使用了 B,如果我们传入一个长度大于 0xff 的 destination,假如 len(destination) = 0x100,那么 CommandMessage.read_message 读取数据时,会认为传入的请求中不包含 destination (pack 之后 DST_LEN = 0x00),于是继续读取 msg_len 长度的数据作为 message 使用,但这里实际上读取的是 0x100 长度的 destination 所在位置。

解析并读取数据后,代码会检查 source 和 destination 是否与系统序列号相同,source 是调用 tags.serial() 获取的,可以通过验证。destination 由于 length 等于 0,会直接跳过检查逻辑。

接着代码根据请求中的 msg_type 调用 MSG_CALL 对应的函数,并传入 message 作为参数:

1
2
3
4
def call(self, message, source, destination, txn_tag):
__pychecker__ = 'unusednames=destination,unused'
modules_and_requests = cPickle.loads(message)
# ...

MSG_CALL 对应的函数中,直接调用 cPickle.loads 反序列化 message 数据,正常情况下 message 是 cPickle.dump 得到的,但由于对 header 长度解析问题导致此处反序列化的实际上是可控数据,造成反序列化漏洞。

漏洞利用

我们已经知道了漏洞的成因,那么利用此漏洞也比较简单。向登录接口发送一个请求,请求中应该包含 auth 和 serial 参数,且 auth 参数的长度和反序列化 payload 的长度相等。

serial 参数的长度应该是 0x100 的整数倍,这样可以导致发往 commandd.mainsockDST_LEN 为 0,同时 serial 的构成为:反序列化 payload + padding 到 0x100 整数倍任意数据

这样 read_message 就会读取到错误的 message,然后在后续的 call 中反序列化了攻击者控制的数据,由于 CommandServer 是以 root 权限运行的,反序列化导致的命令执行也是 root 权限,这和漏洞公告相符。

由于该漏洞存在在野利用情况,建议受影响用户尽快更新到最新版本,并检查系统是否已经被攻击。

AI 自动分析(🤖)

本次分析使用了 Claude Code + deepseek-reasoner,目录结构为

1
2
3
CVE-2025-20393 --+
|--- 16.0.1.017
|--- 16.0.4.016

输入提示词(无进一步辅助信息输入):

1
2
3
4
5
6
7
我正在尝试复现 Cisco 邮件安全网关最近出现的未授权远程代码执行漏洞 CVE-2025-20393,该漏洞的描述如下:
A vulnerability in the Spam Quarantine feature of Cisco AsyncOS Software for Cisco Secure Email Gateway and Cisco Secure Email and Web Manager could allow an unauthenticated, remote attacker to execute arbitrary system commands on an affected device with root privileges. This vulnerability is due to insufficient validation of HTTP requests by the Spam Quarantine feature. An attacker could exploit this vulnerability by sending a crafted HTTP request to the affected device. A successful exploit could allow the attacker to execute arbitrary commands on the underlying operating system with root privileges.
你可以在 https://sec.cloudapps.cisco.com/security/center/content/CiscoSecurityAdvisory/cisco-sa-sma-attack-N9bf4 找到关于该漏洞更详细的信息。
我分别获取了 Cisco 邮件安全网关的 16.0.1.017 和 16.0.4.016 版本的关键系统文件,这些文件位于当前目录下,其中 16.0.1.017 目录是存在漏洞的版本,16.0.4.016 目录是已经修复了该漏洞的版本。
和 Spam Quarantine 功能有关的关键代码位于 16.0.1.017\phoebe-16-0-1-017.1736506704\lib\python2.6_13_amd64_nothr\site-packages 和 16.0.4.016\phoebe-16-0-4-016.1767883144\lib\python2.6_13_amd64_nothr\site-packages 目录下,其中 enduser、enduser_site_packages 是重点模块。你只需要关注 .py 文件,而无需关注任何编译后的文件(例如 .pyc .pyo 等)。
为了分析该漏洞,我建议你应该对比两个版本代码库存在哪些变化,请注意由于 .py 文件基本都是反编译得到的,这些文件开头可能会包含源文件的编译时间,如果两个文件除编译时间之外所有内容都完全相同,那么此文件就不应该视为发生了变化,也不应该着重分析,另外有一些 .py 文件只是版本号存在差异,具体逻辑没有任何变化,这些文件也不应该着重分析。你需要想办法过滤掉这些无关的变化,专注于发现可能和漏洞有关键联系的变化。
当你找到可能的漏洞点时,请你深入分析漏洞的成因,搞清楚为什么新版本要做修改,以及该漏洞会造成哪些具体的影响,并使用 Markdown 格式编写详细的漏洞分析报告保存到当前目录的 CVE-2025-20393.md 中。

输出文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# CVE-2025-20393 Vulnerability Analysis Report

## Overview

**CVE ID**: CVE-2025-20393
**Vulnerability Type**: Unauthenticated Remote Code Execution
**Affected Product**: Cisco AsyncOS Software for Cisco Secure Email Gateway and Cisco Secure Email and Web Manager
**Affected Version**: 16.0.1.017 (and earlier versions)
**Patched Version**: 16.0.4.016
**Vulnerability Description**: A vulnerability in the Spam Quarantine feature could allow an unauthenticated, remote attacker to execute arbitrary system commands on an affected device with root privileges. This vulnerability is due to insufficient validation of HTTP requests by the Spam Quarantine feature.

## Root Cause Analysis

### Vulnerability Location

The vulnerability exists in the authentication mechanism of the Spam Quarantine web interface, specifically in the `AppController.py` file located at:

```
enduser_site_packages/aquarium/screen/AppController.py
```

### Vulnerable Code Path

1. **HTTP Request Handling**: The `_handleNoLoginAuthentication()` method processes authentication requests without proper validation.
2. **Authentication Parameter**: The `auth` parameter from the HTTP request form data is directly used without sanitization.
3. **Authentication Flow**: When `form.has_key('auth')` is true, the code calls `specialLogin()` which in turn calls `specialAuthenticate()`.
4. **RPC Chain**: `specialAuthenticate()` calls `checkHash()` with the raw `auth` parameter, initiating an RPC call chain that eventually reaches system-level operations.

### Code Analysis

#### Vulnerable Code (16.0.1.017)

```python
def specialAuthenticate(self):
return self._ctx.session.checkHash(self._ctx.form['auth'], self._ctx.form.get('serial', None))

def _handleNoLoginAuthentication(self):
# ... other code ...
elif form.has_key('auth'):
if not self.specialLogin():
_reject_login_admin(_('An error occurred during authentication...'))
```

#### Security Issue

The `auth` parameter is extracted directly from `self._ctx.form['auth']` without any validation. This parameter is intended to be an MD5 hash (32 hexadecimal characters) but the vulnerable version does not enforce this format. An attacker could potentially inject malicious payload containing shell metacharacters (e.g., `;`, `|`, `&`, `$()`, backticks) that might be interpreted as commands if the parameter is passed to a shell execution function.

### Attack Vector

An unauthenticated attacker can send a crafted HTTP request to the Spam Quarantine interface with a malicious `auth` parameter. If this parameter reaches a shell command without proper sanitization, arbitrary commands could be executed with root privileges.

## Patch Analysis

### Security Fix (16.0.4.016)

#### Primary Fix: Input Validation in AppController.py

The patched version adds strict MD5 hash format validation before processing the `auth` parameter:

```python
elif form.has_key('auth'):
if self._ctx.form.get('serial', None):
del form['serial']
try:
if self._ctx.form.get('auth') is not None:
ui_checker.is_md5Hash(self._ctx.form.get('auth', ''))
else:
raise
except ui_checker.FilterError, e:
coro.print_stderr('Error: %s' % str(e))
_reject_login_admin(_('An error occurred during authentication...'))
except Exception, e:
coro.print_stderr('Exception ocurred in validating auth code. Error: %s' % str(e))
_reject_login_admin(_('An error occurred during authentication...'))
else:
if not self.specialLogin():
_reject_login_admin(_('An error occurred during authentication...'))
```

#### New Validation Function: is_md5Hash() in ui_checker.py

The patch introduces a new validation function that strictly enforces MD5 hash format:

```python
def is_md5Hash(s):
if len(s) != 32:
raise FilterError('is_md5Hash', validate_md5_format, s[:80] if s else s)
try:
int(s, 16)
except ValueError:
raise FilterError('is_md5Hash', validate_md5_format, s[:80] if s else s)
return True

validate_md5_format = _("MD5 Hash must be 32 characters long and can contain only '0-9' and 'a-f'\nsymbols.")
```

#### Validation Logic

1. **Length Check**: Ensures the input is exactly 32 characters long (standard MD5 hash length).
2. **Character Set Validation**: Verifies that all characters are hexadecimal (0-9, a-f, A-F) by attempting to convert to integer base 16.
3. **Error Handling**: Throws `FilterError` with appropriate error messages if validation fails.

### Secondary Changes

1. **Additional Imports**: `AppController.py` now imports `ui_checker` module for validation functions.
2. **Error Logging**: Added proper error logging using `coro.print_stderr()` for debugging and monitoring.
3. **Defensive Programming**: Added check for `None` values in the `auth` parameter.

## Impact Assessment

### Potential Consequences

1. **Remote Code Execution**: Successful exploitation could allow attackers to execute arbitrary commands with root privileges on the affected device.
2. **Complete System Compromise**: Root access would enable attackers to install malware, exfiltrate data, modify configurations, or use the device as a pivot point in the network.
3. **Authentication Bypass**: The vulnerability could potentially allow bypassing authentication mechanisms to access quarantined messages or administrative functions.
4. **Denial of Service**: Malicious commands could disrupt email security services or cause system instability.

### Affected Components

- Spam Quarantine web interface authentication
- End-user quarantine access
- Administrative quarantine management
- Related RPC services in the authentication chain

## Technical Details

### File Changes Summary

| File Path | Change Type | Description |
| -------------------------------------------------------- | ------------ | ------------------------------------------------------- |
| `enduser_site_packages/aquarium/screen/AppController.py` | Security Fix | Added MD5 hash validation for `auth` parameter |
| `phoebe/ui_checker.py` | New Feature | Added `is_md5Hash()` validation function |
| Various template files | Minor | Updated compilation timestamps (irrelevant to security) |

### Key Security Principles Addressed

1. **Input Validation**: The fix implements strict allow-list validation (accepting only valid MD5 hashes) rather than attempting to filter malicious characters.
2. **Defense in Depth**: Validation occurs at the entry point before the parameter enters the authentication flow.
3. **Fail-Safe Defaults**: Invalid input causes authentication to fail with appropriate error messages.
4. **Principle of Least Privilege**: Although not directly addressed, preventing command injection limits potential damage.

## Exploitation Scenarios

### Theoretical Attack Chain

1. Attacker crafts an HTTP request to the Spam Quarantine interface with a malicious `auth` parameter containing shell commands.
2. The malicious payload bypasses validation in the vulnerable version.
3. The payload propagates through the authentication RPC chain.
4. If any component in the chain passes the parameter to a shell command without proper escaping, the attacker's commands execute with root privileges.
5. Attacker gains full control over the email security appliance.

### Mitigation Factors

- The exact command injection point is not publicly disclosed, but the validation fix prevents exploitation by restricting input to safe characters.
- Network access to the Spam Quarantine interface is required for exploitation.

## Recommendations

### For Users

1. **Immediate Action**: Upgrade to version 16.0.4.016 or later.
2. **Network Segmentation**: Restrict access to management interfaces to authorized networks only.
3. **Monitoring**: Review logs for suspicious authentication attempts or unusual system commands.

### For Developers

1. **Input Validation**: Always validate user input using allow-list approaches.
2. **Secure Coding Practices**: Avoid passing user input directly to shell commands; use parameterized APIs when possible.
3. **Security Testing**: Implement comprehensive security testing including command injection tests.
4. **Principle of Defense in Depth**: Validate input at multiple layers - client-side, server-side, and in business logic.

## Conclusion

CVE-2025-20393 represents a critical command injection vulnerability in Cisco's Spam Quarantine feature. The vulnerability stems from insufficient validation of the `auth` parameter in HTTP requests, potentially allowing attackers to inject and execute arbitrary commands with root privileges.

The security fix implements robust MD5 hash format validation, ensuring that the `auth` parameter contains only expected hexadecimal characters before further processing. This validation effectively neutralizes command injection attacks by restricting input to a safe character set.

This case highlights the importance of proper input validation in security-critical authentication flows and demonstrates how relatively simple validation mechanisms can prevent serious vulnerabilities.

## References

1. Cisco Security Advisory: https://sec.cloudapps.cisco.com/security/center/content/CiscoSecurityAdvisory/cisco-sa-sma-attack-N9bf4
2. National Vulnerability Database: https://nvd.nist.gov/vuln/detail/CVE-2025-20393
3. Common Weakness Enumeration: CWE-78 (Improper Neutralization of Special Elements used in an OS Command)

## Appendix: Code Diffs

### AppController.py Key Changes

```diff
--- 16.0.1.017/AppController.py
+++ 16.0.4.016/AppController.py
@@ -47,7 +47,7 @@
from command_client import command_client
from Commandment import DaemonUnresponsiveError
from shared import Branding
-import features, qlog, variable_store, slbl.exceptions
+import coro, features, qlog, ui_checker, variable_store, slbl.exceptions
from bsddb3.db import DBError
from shared.SLBL import SLBL
import euq_vars, euq.access.access as access, euq.access.auth_base as auth_base
@@ -230,8 +230,23 @@
elif not self.urlLogin():
_reject_login_enduser(_('An error occurred during authentication. Please contact your system\nadministrator.'))
elif form.has_key('auth'):
- if not self.specialLogin():
+ if self._ctx.form.get('serial', None):
+ del form['serial']
+ try:
+ if self._ctx.form.get('auth') is not None:
+ ui_checker.is_md5Hash(self._ctx.form.get('auth', ''))
+ else:
+ raise
+ except ui_checker.FilterError, e:
+ coro.print_stderr('Error: %s' % str(e))
_reject_login_admin(_('An error occurred during authentication. Please contact your system\nadministrator or, if you are the System Administrator, view logs for details.'))
+ except Exception, e:
+ coro.print_stderr('Exception ocurred in validating auth code. Error: %s' % str(e))
+ _reject_login_admin(_('An error occurred during authentication. Please contact your system\nadministrator or, if you are the System Administrator, view logs for details.'))
+ else:
+ if not self.specialLogin():
+ _reject_login_admin(_('An error occurred during authentication. Please contact your system\nadministrator or, if you are the System Administrator, view logs for details.'))
+ return
```

### ui_checker.py New Function

```diff
+def is_md5Hash(s):
+ if len(s) != 32:
+ raise FilterError('is_md5Hash', validate_md5_format, s[:80] if s else s)
+ try:
+ int(s, 16)
+ except ValueError:
+ raise FilterError('is_md5Hash', validate_md5_format, s[:80] if s else s)
+ return True
+
+validate_md5_format = _("MD5 Hash must be 32 characters long and can contain only '0-9' and 'a-f'\nsymbols.")
```

---

*Report generated on 2026-02-05*
*Analysis based on Cisco AsyncOS versions 16.0.1.017 (vulnerable) and 16.0.4.016 (patched)*

看起来 AI 定位到了漏洞点,但是没有进一步深入找到命令执行的方法(根本原因),它根据新版本对参数添加的检查认为该参数可能存在命令注入。

如果采取更换性能更强的模型、设计更好的 Agent 架构或工具、优化提示词等办法,AI 也许可以自动给出正确且完整的报告。

本次分析消耗 66k token,耗费 ¥1.30,AI 自动编写了用于 diff 的 Python 脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
import os
import sys
import difflib
import re

def normalize_line(line):
"""Remove timestamp and version strings that are likely to differ."""
# Remove decompiler timestamp lines
if line.startswith('# uncompyle6 version'):
return '# uncompyle6 version'
if line.startswith('# Python bytecode version'):
return '# Python bytecode version'
if line.startswith('# Decompiled from:'):
return '# Decompiled from:'
if line.startswith('# Embedded file name:'):
return '# Embedded file name:'
if line.startswith('# Compiled at:'):
return '# Compiled at:'
# Remove version strings like $Id: ... $
line = re.sub(r'\$Id: .*?\$', '$Id$', line)
# Remove version numbers like 16.0.1.017 or 16.0.4.016
line = re.sub(r'\b\d+\.\d+\.\d+\.\d+\b', 'VERSION', line)
# Remove specific build numbers like phoebe-16-0-1-017.1736506704
line = re.sub(r'phoebe-\d+-\d+-\d+-\d+\.\d+', 'phoebe-VERSION', line)
return line

def compare_files(file1, file2):
"""Compare two files ignoring irrelevant differences."""
with open(file1, 'r', encoding='utf-8', errors='ignore') as f1:
lines1 = [normalize_line(l.rstrip()) for l in f1]
with open(file2, 'r', encoding='utf-8', errors='ignore') as f2:
lines2 = [normalize_line(l.rstrip()) for l in f2]

diff = list(difflib.unified_diff(lines1, lines2, lineterm=''))
return diff

def find_python_files(root):
"""Return list of .py files under root."""
py_files = []
for dirpath, dirnames, filenames in os.walk(root):
for f in filenames:
if f.endswith('.py'):
py_files.append(os.path.join(dirpath, f))
return py_files

def main():
if len(sys.argv) != 3:
print("Usage: compare.py <dir1> <dir2>")
sys.exit(1)

dir1 = sys.argv[1]
dir2 = sys.argv[2]

# Find all .py files in both directories
py1 = find_python_files(dir1)
py2 = find_python_files(dir2)

# Map relative paths
rel1 = {os.path.relpath(p, dir1): p for p in py1}
rel2 = {os.path.relpath(p, dir2): p for p in py2}

all_keys = set(rel1.keys()) | set(rel2.keys())

output = []
for key in sorted(all_keys):
if key in rel1 and key in rel2:
diff = compare_files(rel1[key], rel2[key])
if diff:
output.append(f"--- {key}")
output.extend(diff)
output.append("")
elif key in rel1:
output.append(f"Only in {dir1}: {key}")
else:
output.append(f"Only in {dir2}: {key}")

with open('diff_output.txt', 'w', encoding='utf-8') as f:
f.write('\n'.join(output))

print(f"Diff written to diff_output.txt, {len(output)} lines")

if __name__ == '__main__':
main()

参考文章

https://sec.cloudapps.cisco.com/security/center/content/CiscoSecurityAdvisory/cisco-sa-sma-attack-N9bf4

附件

dump_code.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import idaapi
import idc
import ida_bytes

# 修改这里:_PyImport_FrozenModules 的符号名或地址
FROZEN_SYM = "_PyImport_FrozenModules"
MAGIC_26 = b"\xD1\xF2\x0D\x0A" + b"\xD4\x4E\x44\x67"

def read_cstring(ea):
"""使用 IDA API 读取 C 字符串"""
s = idc.get_strlit_contents(ea, -1, idc.STRTYPE_C)
if s:
try:
return s.decode("utf-8")
except UnicodeDecodeError:
return s.decode("latin1", errors="ignore")
return ""

def dump_frozen_modules():
addr = idc.get_name_ea_simple(FROZEN_SYM)
if addr == idc.BADADDR:
print("[!] 找不到符号 {}, 请手动修改脚本里的地址".format(FROZEN_SYM))
return

idx = 0
while True:
# 64位 ELF 结构:name_ptr (8字节) + code_ptr (8字节) + size (4字节)
name_ptr = ida_bytes.get_qword(addr)
code_ptr = ida_bytes.get_qword(addr + 8)
size = ida_bytes.get_dword(addr + 16)

if name_ptr == 0 and code_ptr == 0 and size == 0:
print("[*] 解析完成")
break # 终止

name = read_cstring(name_ptr)
is_package = False

# 处理负数 size 的情况(package)
if size > 0xffff0000:
real_size = 0xffffffff - size + 1
is_package = True
else:
real_size = size

print("[+] Found module {} size={} (package={})".format(name, real_size, is_package))

# 读取字节码数据
data = ida_bytes.get_bytes(code_ptr, real_size)
if not data:
print(" [!] 读取失败, 地址=0x{:x}".format(code_ptr))
else:
safe_name = name.replace("/", "_")
if is_package:
outname = "{}_pkg.pyc".format(safe_name)
else:
outname = "{}.pyc".format(safe_name)

with open(outname, "wb") as f:
f.write(MAGIC_26 + data)
print(" [*] 已保存到 {}".format(outname))

addr += 24 # 每个 struct _frozen 大小 (假设 64bit)

idx += 1

dump_frozen_modules()

handle_ipoe.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import hmac
import zipfile
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import os
import sys
from pathlib import Path
import uncompyle6

ALGNAME = 'aes-128-cbc'
ALG_BLOCK_SIZE = 16
HASH_TAG_SIZE = 20

class IPOEDecryptError(Exception):
pass

_GK1 = None
_GK2 = None

def get_keys():
global _GK1, _GK2
if _GK1 is not None and _GK2 is not None:
return (_GK1, _GK2)

key1 = bytes.fromhex("68003a1a7465f115a0b2387bce5372ea")
key2 = bytes.fromhex("399a8e2afb8c0160f773826cb5669b99670de403")

_GK1, _GK2 = key1, key2
return (key1, key2)

def decrypt_file(fpath):
fobj = open(fpath, "rb")
tag = fobj.read(HASH_TAG_SIZE)
if not tag:
return b''

c_key, h_key = get_keys()
c_text = fobj.read()

# Verify HMAC
calculated_tag = hmac.new(h_key, c_text, 'sha1').digest()
if calculated_tag != tag:
raise IPOEDecryptError("HMAC verification failed")

# Extract IV and cipher text
iv = c_text[:ALG_BLOCK_SIZE]
cipher_text = c_text[ALG_BLOCK_SIZE:]

# Decrypt
cipher = AES.new(c_key, AES.MODE_CBC, iv)
try:
decrypted_data = unpad(cipher.decrypt(cipher_text), ALG_BLOCK_SIZE)
except ValueError:
# Handle .so files specially (no padding removal)
if fobj.name.endswith('.so'):
decrypted_data = cipher.decrypt(cipher_text)
# Add null bytes to match original length
extra_len = len(c_text) + HASH_TAG_SIZE - len(decrypted_data)
return decrypted_data + b'\x00' * extra_len
else:
raise IPOEDecryptError("Padding error during decryption")
fobj.close()
return decrypted_data

def process_directory(directory_path):
path = Path(directory_path)

if not path.exists():
print(f"Error: Directory {directory_path} does not exist")
return

if not path.is_dir():
print(f"Error: {directory_path} is not a directory")
return

for item in path.iterdir():
if item.is_file():
dec_data = decrypt_file(str(item.absolute()))
with open(str(item.absolute()), "wb") as f:
f.write(dec_data)
elif item.is_dir():
process_directory(str(item.absolute()))

def do_decompile(directory_path):
for dirpath, b, filenames in os.walk(directory_path):
for filename in filenames:
if not filename.endswith('.pyo'):
continue

filepath = dirpath + '/' + filename
original_filename = filename.split('.')[0]
original_filepath = dirpath + '/' + original_filename + '.py'
with open(original_filepath, 'w') as f:
try:
print("Handling %s" % filepath)
uncompyle6.decompile_file(filepath, f)
except:
print("Decompile %s failed" % filepath)

def extract_ipoe_files(root_dir, recursive=False):
root_path = Path(root_dir).resolve()

if not root_path.exists():
print(f"{root_path} not found")
return False

if not root_path.is_dir():
print(f"{root_path} is not a directory")
return False

pattern = "**/*.ipoe" if recursive else "*.ipoe"
ipoe_files = list(root_path.glob(pattern))

if not ipoe_files:
print(f"No ipoe files in {root_path}")
return False

print(f"Fount {len(ipoe_files)} ipoe files, handling ...\n")

success_count = 0
fail_count = 0

for ipoe_file in ipoe_files:
try:
stem = ipoe_file.stem

if '-' not in stem:
print(f"Skip: {ipoe_file.name}")
fail_count += 1
continue

dir_name = stem.split('-', 1)[0].strip()
if not dir_name:
print(f"Skip: {ipoe_file.name}")
fail_count += 1
continue

target_dir = ipoe_file.parent / dir_name

try:
target_dir.mkdir(parents=True, exist_ok=True)
print(f"Creating: {target_dir.relative_to(root_path)}")
except Exception as e:
print(f"Create ({ipoe_file.name}) failed: {e}")
fail_count += 1
continue

try:
with zipfile.ZipFile(ipoe_file, 'r') as zip_ref:
if not zip_ref.testzip() is None:
print(f"Unzip failed: {ipoe_file.name}")
fail_count += 1
continue

zip_ref.extractall(target_dir)
print(f"{ipoe_file.name} -> {target_dir.relative_to(root_path)} {target_dir}")
process_directory(target_dir)
do_decompile(target_dir)
success_count += 1
except zipfile.BadZipFile:
print(f"Unzip failed: {ipoe_file.name}, BadZipFile")
fail_count += 1
continue
except Exception as e:
print(f"Unzip failed: ({ipoe_file.name}): {e}")
fail_count += 1
continue

except Exception as e:
print(f"Error: ({ipoe_file.name}), {e}")
fail_count += 1
finally:
print()

def main():
if len(sys.argv) != 2:
print("Usage: python script.py <directory_path>")
sys.exit(1)

directory_path = sys.argv[1]
extract_ipoe_files(directory_path)

if __name__ == "__main__":
main()
  • Title: CVE-2025-20393
  • Author: Catalpa
  • Created at : 2026-02-05 00:00:00
  • Updated at : 2026-02-05 17:09:04
  • Link: https://wzt.ac.cn/2026/02/05/CVE-2025-20393/
  • License: This work is licensed under CC BY-SA 4.0.