欣欣的量子猫步
初步观察
打开靶场链接,页面是一个终端风格的界面,用了 xterm.js 渲染。页面上有个输入框和发送按钮,看起来像是要跟服务端做某种交互。
先 F12 看看前端代码。在 script.js 里找到了关键逻辑:
const ws = new WebSocket(`ws://${location.host}/socket`);页面通过 WebSocket 连接到 /socket 端点。继续往下看,发现消息处理逻辑:
- 服务端会发 JSON 消息,有个
type字段区分类型 type: story是剧情对话,纯展示用type: task是题目数据,里面有个meta字段包含网格参数type: switch_shell表示答对了,切换到 shell 模式type: gameover表示答错了
再看发送逻辑:
function sendAnswer() { const val = input.value.trim(); if (val) ws.send(val);}注意这里是直接 ws.send(val) 发纯文本,不是 JSON。这个细节很重要,后面会踩坑。
理解题意
连上 WebSocket 后,服务端先播一段剧情文字,然后发来一个 type: task 的消息,格式大概是这样:
{ "type": "task", "content": "请计算期望步数...", "meta": { "N": 75, "obstacles": [[2,3], [5,7], ...], "traps": [[10,15], [30,42], ...] }}题意翻译成人话就是:
有一个 N×N 的网格,上面有一些障碍物和陷阱。一只猫从左上角 (0,0) 开始随机游走,每一步等概率往上下左右四个方向走。如果撞到网格边界或者障碍物,就原地不动(这一步算走了但位置没变)。如果踩到陷阱,游走立刻结束。问:猫掉进陷阱的期望步数是多少?
这是一个经典的吸收马尔可夫链问题。
数学原理(尽量说人话)
什么是期望步数?
简单说就是”平均要走多少步才会踩到陷阱”。因为每一步是随机的,所以不同的运气会走不同的步数,期望就是所有可能情况的加权平均。
怎么算?
对网格上每个普通格子(不是障碍物也不是陷阱的格子),我们定义 E(r,c) 表示”从这个格子出发,平均还要走多少步才会掉进陷阱”。
对于格子 (r,c),猫下一步有四个方向可以走,每个方向概率 1/4:
- 如果下一步走到普通格子 (nr,nc):还需要 E(nr,nc) 步
- 如果下一步走到陷阱:0 步(已经结束了)
- 如果下一步撞墙或撞障碍物:原地不动,还需要 E(r,c) 步
所以有方程:
E(r,c) = 1 + (1/4) * [走到普通格子的E之和 + 撞墙次数 * E(r,c) + 0 * 踩陷阱次数]这里的 1 代表”走这一步本身就花了 1 步”。
把所有普通格子的方程列出来,就得到一个线性方程组。格子数量可能很多(比如 75×75 = 5625 个格子),所以用稀疏矩阵来存储和求解,效率会高很多。
举个小例子
假设 3×3 网格,(2,2) 是陷阱,没有障碍物:
. . .. . .. . T从 (0,0) 出发,猫要随机走到右下角的陷阱。直觉上这需要不少步,因为猫经常会撞墙或者在中间晃来晃去。
把每个非陷阱格子列一个方程,解出来就能得到 E(0,0)。
实现思路
第一步:BFS 找可达格子
这里有个坑:网格上可能有些格子被障碍物完全围住了,从 (0,0) 根本走不到。这些格子的期望步数是无穷大,如果把它们也放进方程组里,矩阵就会变成奇异矩阵(singular matrix),解不出来。
解决办法很简单:从 (0,0) 开始做一次 BFS(广度优先搜索),只把能走到的格子放进方程组。BFS 的时候,障碍物和陷阱都不能通过(陷阱是吸收态,不需要算它的期望步数)。
第二步:建立线性方程组
对每个可达的普通格子,列一个方程。整理成 Ax = b 的形式:
- A 是系数矩阵(稀疏的,因为每个格子最多跟 4 个邻居有关系)
- x 是未知数向量(每个格子的期望步数)
- b 是常数向量(全是 1,因为每步花费 1)
具体来说,对于格子 i = (r,c),方程是:
E(i) = 1 + (1/4) * Σ(各方向的贡献)移项整理后:
E(i) - (1/4)*Σ(邻居的E) = 1其中撞墙/撞障碍物的方向,贡献的是 E(i) 自己(原地不动),所以这些项会合并到左边 E(i) 的系数里。
第三步:求解
用 scipy 的 spsolve 求解稀疏线性方程组,取出 E(0,0) 就是答案。保留两位小数发回去。
踩坑记录
坑1:答案格式
一开始我以为要发 JSON 格式的答案,比如 {"type":"answer","content":"534.99"}。结果服务端不认。
回去看前端代码才发现,sendAnswer 函数就是直接 ws.send(val) 发纯文本。所以答案就是一个数字字符串,比如 "534.99"。
坑2:矩阵奇异
第一版代码对所有非障碍非陷阱格子都建方程,跑出来 spsolve 返回 nan。debug 了一下发现是矩阵奇异。
原因是有些格子虽然不是障碍物,但被障碍物围死了,从 (0,0) 走不到。这些格子永远到不了陷阱,期望步数是无穷大,放进方程组就会导致矩阵不可逆。
加了 BFS 只处理可达格子之后就好了。
坑3:flag 位置
拿到 shell 后,第一反应是 cat /flag,结果读出来是 flag{0000000000}——这是 GZCTF 平台的动态 flag 占位符,不是真 flag。
真正的 flag 在 /tmp/flag。所以拿到 shell 后要多翻翻,不能只看常规位置。
完整复现步骤
-
连接 WebSocket:用 Python 的
websocket-client库连接ws://<host>:<port>/socket -
等待题目数据:服务端先发一堆
type: story的剧情消息,不用管。等到收到type: task的消息,从meta字段里取出N(网格大小)、obstacles(障碍物坐标列表)、traps(陷阱坐标列表) -
BFS 找可达格子:从 (0,0) 出发,用 BFS 遍历所有能走到的普通格子(不穿过障碍物和陷阱)
-
建立稀疏线性方程组:对每个可达格子列方程,用 scipy 的
lil_matrix构建稀疏矩阵 -
求解:调用
spsolve解方程组,取出 E(0,0) -
发送答案:把结果格式化成两位小数的纯文本字符串,通过 WebSocket 发送。比如
"534.99" -
进入 shell:如果答案正确,服务端会发
type: switch_shell消息,之后 WebSocket 就变成了一个 PTY shell,可以执行任意命令 -
读取 flag:执行
cat /tmp/flag拿到真正的 flag。注意/flag里的是假的占位符
依赖安装
跑 EXP 之前需要装几个库:
pip install websocket-client numpy scipyEXP
import websocketimport jsonimport timeimport reimport numpy as npimport threadingfrom collections import dequefrom scipy.sparse import lil_matrixfrom scipy.sparse.linalg import spsolve
# ========== 靶机地址,按需填写 ==========HOST = ""PORT = ""WS_URL = f"ws://{HOST}:{PORT}/socket"
shell_mode = threading.Event()flag_found = [None]
def solve_expected_steps(N, obstacles, traps): """ 求解从 (0,0) 出发掉入陷阱的期望步数。 用吸收马尔可夫链建模,BFS + 稀疏线性方程组求解。 """ obstacle_set = set() trap_set = set() for o in obstacles: obstacle_set.add((o[0], o[1])) for t in traps: trap_set.add((t[0], t[1]))
# 起点就是陷阱,期望 0 步 if (0, 0) in trap_set: return 0.0
dirs = [(-1, 0), (1, 0), (0, -1), (0, 1)]
# BFS 从 (0,0) 找所有可达的普通格子 # 不穿过障碍物和陷阱 reachable = set() queue = deque([(0, 0)]) reachable.add((0, 0)) while queue: r, c = queue.popleft() for dr, dc in dirs: nr, nc = r + dr, c + dc if 0 <= nr < N and 0 <= nc < N: if (nr, nc) not in obstacle_set and (nr, nc) not in trap_set and (nr, nc) not in reachable: reachable.add((nr, nc)) queue.append((nr, nc))
# 给每个可达格子编号 states = sorted(reachable) state_index = {s: i for i, s in enumerate(states)} n = len(states)
# 建立稀疏矩阵 A 和向量 b # 方程:E(i) - (1/4)*Σ(邻居E) = 1 A = lil_matrix((n, n)) b = np.ones(n) # 右边全是 1
for idx, (r, c) in enumerate(states): stay = 0 # 原地不动的方向数 for dr, dc in dirs: nr, nc = r + dr, c + dc if nr < 0 or nr >= N or nc < 0 or nc >= N or (nr, nc) in obstacle_set: # 撞墙或撞障碍物 → 原地不动 stay += 1 elif (nr, nc) in trap_set: # 踩陷阱 → 吸收,贡献 0 pass elif (nr, nc) in reachable: # 走到普通格子 → 贡献 -0.25 * E(neighbor) A[idx, state_index[(nr, nc)]] -= 0.25 else: # 不可达的格子当墙处理 stay += 1
# 对角线:1 - stay/4(因为原地不动的概率是 stay/4) # 但原地不动贡献的是 E(i) 自己,移项后系数变成 1 - stay/4 # 等等,这里要仔细想: # E(i) = 1 + (stay/4)*E(i) + (1/4)*Σ(邻居E) + 0*(陷阱) # 移项:E(i) - (stay/4)*E(i) - (1/4)*Σ(邻居E) = 1 # 即:(1 - stay/4)*E(i) - (1/4)*Σ(邻居E) = 1 A[idx, idx] += 1.0 - stay / 4.0
# 求解 Ax = b E = spsolve(A.tocsr(), b) return E[state_index[(0, 0)]]
def on_message(ws, message): """处理 WebSocket 收到的消息""" try: data = json.loads(message) except: # 非 JSON 消息 → shell 模式下的命令输出 if shell_mode.is_set(): print(f"[SHELL] {message}") # 去掉 ANSI 转义码再匹配 flag clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', message) if 'flag{' in clean or 'FLAG{' in clean: m = re.search(r'flag\{[^}]+\}', clean, re.IGNORECASE) if m and m.group(0) != 'flag{0000000000}': flag_found[0] = m.group(0) return
msg_type = data.get('type', '') content = data.get('content', '') meta = data.get('meta')
if msg_type == 'task' and meta: # 收到题目,解题 N = int(meta['N']) obstacles = meta.get('obstacles', []) traps = meta.get('traps', []) answer = solve_expected_steps(N, obstacles, traps) fmt = f"{answer:.2f}" print(f"[ANSWER] {fmt}") # 直接发纯文本,不要发 JSON ws.send(fmt)
elif msg_type == 'switch_shell': # 答对了,进入 shell print("[SHELL MODE]") shell_mode.set() time.sleep(2) # 依次尝试读 flag for cmd in ["cat /tmp/flag", "cat /tmp/flag.txt", "cat /flag", "env | grep FLAG"]: ws.send(cmd + "\n") time.sleep(3)
# 也检查 JSON 消息里有没有 flag if isinstance(content, str) and 'flag{' in content: m = re.search(r'flag\{[^}]+\}', content, re.IGNORECASE) if m and m.group(0) != 'flag{0000000000}': flag_found[0] = m.group(0)
def on_error(ws, error): print(f"[ERROR] {error}")
def on_close(ws, code, msg): print("[CLOSED]")
def on_open(ws): print("[CONNECTED]")
def main(): ws = websocket.WebSocketApp(WS_URL, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close)
# 在后台线程跑 WebSocket t = threading.Thread(target=lambda: ws.run_forever(ping_interval=60)) t.daemon = True t.start()
# 等 flag,最多等 3 分钟 start = time.time() while time.time() - start < 180: time.sleep(1) if flag_found[0]: print(f"[FLAG] {flag_found[0]}") break
time.sleep(5) try: ws.close() except: pass
if __name__ == "__main__": main()