0


FirmAE源码粗读(一)

文章目录

简介

这篇一直说着要写结果一直鸽来着…
争取长期诈尸式更新
目标是对

FirmAE

的源码做一个大致梳理,供日后参考。
由于

FirmAE

本身就是基于

firmdyne

的二开成品,而且基本上没有修改

firmdyne

的代码与代码结构,所以本文实际上也是对

firmdyne

代码的一个梳理。
顺便还能给大家看看什么叫狗尾续貂

FirmAE

主目录下有七个文件夹,内容大致如下:

  • util还没看 不知道干嘛的
  • source 工具包文件夹,包括了重要的提取器extractor、hook掉nvram调用的libnvram和比较一般的固件爬虫scraperconsole
  • scripts 模拟过程中需要用到的脚本,包括重要的makenetwork.pymakeimage.py
  • firmwares 空的 用来给你放固件用
  • database 只有一个schema文件,用来初始化数据库
  • core没看 不知道
  • analyses``````FirmAE自己写的fuzz,外加routersploit

主目录下的脚本文件主要是用于安装、调试和实现docker功能的,其中就包括了

FirmAE

的主体框架和运行入口

run.sh

,就从这里讲起。

run.sh

FirmAE的代码结构主体。其主要功能位于

run_emulation

函数中。
本质上就是个脚手架,并不复杂。一步步调用各种函数并得到运行结果,顺着看一遍很容易理清楚大致流程,这里不对源码进行分析了。
运行流程大致如下:
提取文件系统——提取内核——检查架构——封装qemu镜像——运行模拟,开启网络服务——提供交互终端

这个脚本中也包含了各种

log

的生成,基本上都是通过

echo > ./xxx/xxx

的形式实现的,包括了

brand、name

以及各步运行花费的时间等。

extractor.py

在提取文件系统后会生成相应的

tar.gz

(非全量,仅提取了构建系统必要的目录),该文件既用来判断提取成功与否,也用于获取架构和制作镜像。

/scratch/iid

为每个固件对应的工作目录,在脚本中为

${SCRATCH_DIR}/${IID}

注意一点,开头

souce

firmae.config

,也就是这个

config

后缀的文件,实际上里面有一堆工具函数…不愧是你

if[-e ./firmae.config ];thensource ./firmae.config
elif[-e../firmae.config ];thensource../firmae.config
elseecho"Error: Could not find 'firmae.config'!"exit1fi

从这个脚本文件中也可以看出负责实际模拟的几个重要脚本:

extractor.py

(文件提取)、

makeImage.sh

(镜像制作)、

makeNetwork.py

(qemu模拟+网络服务启动)

firmae.config

除了配置参数外,主要是一些

run.sh

中会用到的工具函数,比如说

get_scratch

。**基本上你在

run.sh

中看到的类似linux命令但又不是的其实都是这个文件中的sh脚本函数。**
同时也包括了网络测试函数

check_network

(通过

ping

curl

测试网络联通性)、虚拟磁盘创建、卸载函数

add/del_partition
get_scratch(){if check_number "${1}";thenecho"Error: Invalid image number!"exit1fiecho"${SCRATCH_DIR}/${IID}"
    只是输出工作目录...
}

util.py

类似

firmae.config

,提供一些工具函数。本质上是将sql语句封装成函数方便调用。

#!/usr/bin/env python3import sys
import psycopg2
import hashlib

def io_md5(target):
    blocksize =65536
    hasher = hashlib.md5()

    with open(target, 'rb') as ifp:
        buf = ifp.read(blocksize)while buf:
            hasher.update(buf)
            buf = ifp.read(blocksize)return hasher.hexdigest()#md5计算函数,常用来计算固件md5以存入数据库 

def query_(query, psql_ip):
    try:
        dbh = psycopg2.connect(database="firmware",
                               user="firmadyne",
                               password="firmadyne",
                               host=psql_ip)
        cur = dbh.cursor()
        cur.execute(query)return cur.fetchone()

    except:
        return None
        
#数据库连接函数,该数据库在安装脚本中创建

def get_iid(infile, psql_ip):
    md5 = io_md5(infile)
    q ="SELECT id FROM image WHERE hash = '%s'" % md5
    image_id = query_(q, psql_ip)if image_id:
        return image_id[0]
    else:
        return""#获取iid(image id),iid是firmae每个固件除md5外的唯一识别号

def get_brand(infile, psql_ip):
    md5 = io_md5(infile)
    q ="SELECT brand_id FROM image WHERE hash = '%s'" % md5
    brand_id = query_(q, psql_ip)if brand_id:
        q ="SELECT name FROM brand WHERE id = '%s'" % brand_id
        brand = query_(q, psql_ip)if brand:
            return brand[0]
        else:
            return""
    else:
        return""#获取品牌

def check_connection(psql_ip):
    try:
        dbh = psycopg2.connect(database="firmware",
                               user="firmadyne",
                               password="firmadyne",
                               host=psql_ip)
        dbh.close()return0
    except:
        return1#检查数据库联通性# command lineif __name__ =='__main__':[infile, psql_ip]= sys.argv[2:4]if sys.argv[1]=='get_iid':
        print(get_iid(infile, psql_ip))if sys.argv[1]=='get_brand':
        print(get_brand(infile, psql_ip))if sys.argv[1]=='check_connection':
        exit(check_connection(psql_ip))

getArch.py

遍历目录寻找web服务端与可执行文件,通过这些文件获取架构与大小端。
可执行文件会输出在工作目录下的

fileList

中。

#!/usr/bin/env python3import sys
import tarfile
import subprocess
import psycopg2

archMap ={"MIPS64":"mips64","MIPS":"mips","ARM64":"arm64","ARM":"arm","Intel 80386":"intel","x86-64":"intel64","PowerPC":"ppc","unknown":"unknown"}

endMap ={"LSB":"el","MSB":"eb"}defgetArch(filetype):for arch in archMap:if filetype.find(arch)!=-1:return archMap[arch]returnNonedefgetEndian(filetype):for endian in endMap:if filetype.find(endian)!=-1:return endMap[endian]returnNone

infile = sys.argv[1]
psql_ip = sys.argv[2]
base = infile[infile.rfind("/")+1:]
iid = base[:base.find(".")]

tar = tarfile.open(infile,'r')#输入为文件系统打包的tar.gz

infos =[]
fileList =[]for info in tar.getmembers():ifany([info.name.find(binary)!=-1for binary in["/busybox","/alphapd","/boa","/http","/hydra","/helia","/webs"]]):
        infos.append(info)elifany([info.name.find(path)!=-1for path in["/sbin/","/bin/"]]):
        infos.append(info)
    fileList.append(info.name)withopen("scratch/"+ iid +"/fileList","w")as f:for filename in fileList:try:
            f.write(filename +"\n")except:continue#根据目录名枚举web服务端/可执行文件,并写入fileListfor info in infos:
    tar.extract(info, path="/tmp/"+ iid)
    filepath ="/tmp/"+ iid +"/"+ info.name
    filetype = subprocess.check_output(["file", filepath]).decode()

    arch = getArch(filetype)
    endian = getEndian(filetype)if arch and endian:print(arch + endian)
        subprocess.call(["rm","-rf","/tmp/"+ iid])
        dbh = psycopg2.connect(database="firmware",
                               user="firmadyne",
                               password="firmadyne",
                               host=psql_ip)
        cur = dbh.cursor()
        query ="""UPDATE image SET arch = '%s' WHERE id = %s;"""
        cur.execute(query %(arch+endian, iid))
        dbh.commit()withopen("scratch/"+ iid +"/fileType","w")as f:
            f.write(filetype)break#连接数据库,更新架构与大小端

subprocess.call(["rm","-rf","/tmp/"+ iid])

相信大家看到这里已经知道这是什么玩意了…它遍历每一个二进制文件,问题是留下的信息只有最后一个的…

firmdyne

用的是

getArch.sh

FirmAE

改写成了现在这个池沼代码)

tar2db.py

检索文件系统封装成的

tar.gz

,更新

postgresql

数据库信息。
主要用到的是两个表:

object

object_to_image

。前者只有文件id和文件哈希,似乎是为了判断不同固件中的文件是否重复(不过考虑到不同固件中大概率不会有一模一样的文件,所以用处可能不怎么大,而且出现相同文件时也没有什么提示);后者则是对固件中封装进镜像的文件信息做一个全面的记录。
————————————————————————————————————————
23.1.19更新。Firmadyne官方提供了关于数据库的详细解释,不看文档就嗯写的我是xx
数据库结构传送门

#!/usr/bin/env python3import tarfile
import getopt
import sys
import re
import hashlib
import psycopg2

psql_ip =''defgetFileHashes(infile):
    t = tarfile.open(infile)
   
    files =list()
    links =list()for f in t.getmembers():if f.isfile():# we use f.name[1:] to get rid of the . at the beginning of the path
            files.append((f.name[1:], hashlib.md5(t.extractfile(f).read()).hexdigest(),
                          f.uid, f.gid, f.mode))elif f.issym():
            links.append((f.name[1:], f.linkpath))return(files, links)'''遍历目录以获取文件的md5、uid、gid,处理符号链接,
    跳过文件名开头的.,返回文件信息、符号链接信息'''defgetOids(objs, cur):# hashes ... all the hashes in the tar file
    hashes =[x[1]for x in objs]
    hashes_str =",".join(["""'%s'"""% x for x in hashes])
    query ="""SELECT id,hash FROM object WHERE hash IN (%s)"""
    cur.execute(query % hashes_str)
    res =[(int(x), y)for(x, y)in cur.fetchall()]#res是由object表中id和hash组成的键值对
    
    existingHashes =[x[1]for x in res]

    missingHashes =set(hashes).difference(set(existingHashes))

    newObjs = createObjects(missingHashes, cur)#在object表中插入新哈希键值对

    res += newObjs

    result =dict([(y, x)for(x, y)in res])return result

    '''查询文件哈希,更新缺少的哈希键值对,
    返回值为文件系统中全部文件在更新后的object表内的hash/id键值对'''defcreateObjects(hashes, cur):
    query ="""INSERT INTO object (hash) VALUES (%(hash)s) RETURNING id"""
    res =list()for h inset(hashes):try:
            cur.execute(query,{'hash':h})
            oid =int(cur.fetchone()[0])
            res.append((oid, h))except:continuereturn res

definsertObjectToImage(iid, files2oids, links, cur):
    query ="""INSERT INTO object_to_image (iid, oid, filename, regular_file, uid, gid, permissions) VALUES (%(iid)s, %(oid)s, %(filename)s, %(regular_file)s, %(uid)s, %(gid)s, %(mode)s)"""try:
        cur.executemany(query,[{'iid': iid,'oid': x[1],'filename': x[0][0],'regular_file':True,'uid': x[0][1],'gid': x[0][2],'mode': x[0][3]} \
                            for x in files2oids])
        cur.executemany(query,[{'iid': iid,'oid':1,'filename': x[0],'regular_file':False,'uid':None,'gid':None,'mode':None} \
                            for x in links])except:returndefprocess(iid, infile):global psql_ip
    dbh = psycopg2.connect(database="firmware",
                           user="firmadyne",
                           password="firmadyne",
                           host=psql_ip)#注意看install.sh,里面有数据库相关  
    cur = dbh.cursor()(files, links)= getFileHashes(infile)#获取文件系统中的文件md5、uid、gid,获取符号链接信息
    oids = getOids(files, cur)#更新数据库哈希
    fdict =dict([(h,(filename, uid, gid, mode)) \
            for(filename, h, uid, gid, mode)in files])    
    
    file2oid =[(fdict[h], oid)for(h, oid)in oids.items()]#数据重组
    insertObjectToImage(iid, file2oid, links, cur)#将本次提取得到的全量文件数据存入object_to_image表中
    dbh.commit()

    dbh.close()defmain():global psql_ip
    infile = iid =None
    opts, argv = getopt.getopt(sys.argv[1:],"f:i:h:")for k, v in opts:if k =='-i':
            iid =int(v)if k =='-f':
            infile = v
        if k =='-h':
            psql_ip = v

    #一般来说三个参数都要用到if infile andnot iid:
        m = re.search(r"(\d+)\.tar\.gz", infile)if m:
            iid =int(m.groups(1))#未指定iid则从tar.gz文件名中获取
    process(iid, infile)if __name__ =="__main__":
    main()

后面的(二)部分应该会写一写

extractor.py

,争取这周搞完。

标签: 安全 pwn iot

本文转载自: https://blog.csdn.net/weixin_45209963/article/details/127856504
版权归原作者 xyzmpv 所有, 如有侵权,请联系我们删除。

“FirmAE源码粗读(一)”的评论:

还没有评论