Add new disassembler class

pull/2/head
Bernhard Mueller 7 years ago
parent df965ea723
commit bae397512d
  1. 2
      README.md
  2. 92
      disassembler/disassembly.py
  3. 1
      disassembler/signatures.json
  4. 6
      myth
  5. 62
      mythril/ether/asm.py
  6. 12
      mythril/ether/ethcontract.py
  7. 12
      tests/ethcontract_test.py

@ -70,7 +70,7 @@ vm op=PUSH1 gas=b'20997' stack=[b'96'] depth=0 steps=1 inst=96 pushvalue=64 pc=b
vm op=MSTORE gas=b'20994' stack=[b'96', b'64'] depth=0 steps=2 inst=82 pc=b'4' vm op=MSTORE gas=b'20994' stack=[b'96', b'64'] depth=0 steps=2 inst=82 pc=b'4'
``` ```
Do note however that the disassembly / debugging functionality is still quite bare-bones. For manual analysis & debugging I recommend using [remix](https://remix.ethereum.org/) and [etherscan](https://etherscan.io). Do note however that the instruction_list / debugging functionality is still quite bare-bones. For manual analysis & debugging I recommend using [remix](https://remix.ethereum.org/) and [etherscan](https://etherscan.io).
#### Finding cross-references #### Finding cross-references

@ -0,0 +1,92 @@
from mythril.ether import asm,evm,util
from mythril.rpc.client import EthJsonRpc
from ethereum import utils
import binascii
import sys
import os
import json
class Block:
def __init__(self, code_index, start_addr, funcname):
self.code_index = code_index
self.funcname = funcname
self.xrefs = []
def update_length(self, num_instructions):
self.length = num_instructions
class Disassembly:
def __init__(self, code):
self.instruction_list = asm.disassemble(util.safe_decode(code))
self.blocks = []
self.func_to_addr = {}
self.addr_to_func = []
# Parse jump table & resolve function names
script_dir = os.path.dirname(os.path.realpath(__file__))
signature_file = os.path.join(script_dir, 'signatures.json')
with open(signature_file) as f:
signatures = json.load(f)
jmptable_indices = asm.find_opcode_sequence(["PUSH4", "EQ"], self.instruction_list)
for i in jmptable_indices:
func_hash = self.instruction_list[i]['argument']
try:
func_name = signatures[func_hash]
except KeyError:
func_name = "UNK_" + func_hash
try:
offset = self.instruction_list[i+2]['argument']
jump_target = self.instruction_list[i]['address'] + int(offset, 16)
self.func_to_addr[func_name] = jump_target
self.addr_to_func[jump_target] = func_name
except:
continue
# Parse instructions into basic blocks
current_block = Block(0, 0, "prologue")
index = 0
blocklen = 0
for instruction in self.instruction_list:
blocklen += 1
if (instruction['opcode'] == "JUMPDEST"):
try:
func_name = self.addr_to_func[instruction['address']]
except IndexError:
func_name = "UNKNOWN_JUMPDEST"
current_block.update_length(blocklen)
self.blocks.append(current_block)
current_block = Block(index, instruction['address'], func_name)
blocklen = 0
index += 1
def get_easm(self):
easm = ""
for block in self.blocks:
easm += str(self.instruction_list[block.code_index]['address']) + " --- " + block.funcname + "---\n"
easm += asm.instruction_list_to_easm(self.instruction_list[block.code_index + 1:block.code_index + block.length])
return easm

File diff suppressed because one or more lines are too long

@ -5,6 +5,7 @@
""" """
from mythril.ether import asm,evm,util from mythril.ether import asm,evm,util
from disassembler.disassembly import Disassembly
from mythril.ether.contractstorage import get_persistent_storage from mythril.ether.contractstorage import get_persistent_storage
from mythril.rpc.client import EthJsonRpc from mythril.rpc.client import EthJsonRpc
from ethereum import utils from ethereum import utils
@ -78,11 +79,12 @@ if (args.disassemble):
exitWithError("Disassembler: Provide the input bytecode via -c BYTECODE or --id ID") exitWithError("Disassembler: Provide the input bytecode via -c BYTECODE or --id ID")
try: try:
disassembly = asm.disassemble(util.safe_decode(encoded_bytecode)) disassembly = Disassembly(encoded_bytecode)
# instruction_list = asm.disassemble(util.safe_decode(encoded_bytecode))
except binascii.Error: except binascii.Error:
exitWithError("Disassembler: Invalid code string.") exitWithError("Disassembler: Invalid code string.")
easm_text = asm.disassembly_to_easm(disassembly) easm_text = disassembly.get_easm()
if (args.outfile): if (args.outfile):
util.string_to_file(args.outfile, easm_text) util.string_to_file(args.outfile, easm_text)

@ -2,30 +2,34 @@ import sys
import re import re
import codecs import codecs
from ethereum import opcodes from ethereum import opcodes
from mythril.ether import util
regex_PUSH = re.compile('^PUSH(\d*)$') regex_PUSH = re.compile('^PUSH(\d*)$')
def disassembly_to_easm(disassembly): def instruction_list_to_easm(instruction_list):
easm = "" easm = ""
for instruction in disassembly: # print(instruction_list)
easm += instruction['opcode']
for instruction in instruction_list:
easm += str(instruction['address']) + " " + instruction['opcode']
if 'argument' in instruction: if 'argument' in instruction:
easm += " 0x" + codecs.decode(instruction['argument'], 'utf-8') easm += " " + instruction['argument']
easm += "\n" easm += "\n"
return easm return easm
def easm_to_disassembly(easm): def easm_to_instruction_list(easm):
regex_CODELINE = re.compile('^([A-Z0-9]+)(?:\s+([0-9a-fA-Fx]+))?$') regex_CODELINE = re.compile('^([A-Z0-9]+)(?:\s+([0-9a-fA-Fx]+))?$')
disassembly = [] instruction_list = []
codelines = easm.split('\n') codelines = easm.split('\n')
@ -44,9 +48,9 @@ def easm_to_disassembly(easm):
if m.group(2): if m.group(2):
instruction['argument'] = m.group(2)[2:] instruction['argument'] = m.group(2)[2:]
disassembly.append(instruction) instruction_list.append(instruction)
return disassembly return instruction_list
def get_opcode_from_name(name): def get_opcode_from_name(name):
@ -60,20 +64,20 @@ def get_opcode_from_name(name):
raise RuntimeError("Unknown opcode") raise RuntimeError("Unknown opcode")
def find_opcode_sequence(pattern, disassembly): def find_opcode_sequence(pattern, instruction_list):
match_indexes = [] match_indexes = []
pattern_length = len(pattern) pattern_length = len(pattern)
for i in range(0, len(disassembly) - pattern_length): for i in range(0, len(instruction_list) - pattern_length):
if disassembly[i]['opcode'] == pattern[0]: if instruction_list[i]['opcode'] == pattern[0]:
matched = True matched = True
for j in range(1, len(pattern)): for j in range(1, len(pattern)):
if not (disassembly[i + j]['opcode'] == pattern[j]): if not (instruction_list[i + j]['opcode'] == pattern[j]):
matched = False matched = False
break break
@ -85,24 +89,26 @@ def find_opcode_sequence(pattern, disassembly):
def disassemble(bytecode): def disassemble(bytecode):
disassembly = [] instruction_list = []
i = 0 addr = 0
while i < len(bytecode): while addr < len(bytecode):
instruction = {} instruction = {}
instruction['address'] = addr
try: try:
if (sys.version_info > (3, 0)): if (sys.version_info > (3, 0)):
opcode = opcodes.opcodes[bytecode[i]] opcode = opcodes.opcodes[bytecode[addr]]
else: else:
opcode = opcodes.opcodes[ord(bytecode[i])] opcode = opcodes.opcodes[ord(bytecode[addr])]
except KeyError: except KeyError:
# invalid opcode # invalid opcode
disassembly.append({'opcode': "INVALID"}) instruction_list.append({'address': addr, 'opcode': "INVALID"})
i += 1 addr += 1
continue continue
instruction['opcode'] = opcode[0] instruction['opcode'] = opcode[0]
@ -110,22 +116,22 @@ def disassemble(bytecode):
m = re.search(regex_PUSH, opcode[0]) m = re.search(regex_PUSH, opcode[0])
if m: if m:
argument = bytecode[i+1:i+1+int(m.group(1))] argument = bytecode[addr+1:addr+1+int(m.group(1))]
instruction['argument'] = codecs.encode(argument, "hex_codec") instruction['argument'] = "0x" + argument.hex()
i += int(m.group(1)) addr += int(m.group(1))
disassembly.append(instruction) instruction_list.append(instruction)
i += 1 addr += 1
return disassembly return instruction_list
def assemble(disassembly): def assemble(instruction_list):
bytecode = b"" bytecode = b""
for instruction in disassembly: for instruction in instruction_list:
try: try:
opcode = get_opcode_from_name(instruction['opcode']) opcode = get_opcode_from_name(instruction['opcode'])
@ -136,6 +142,6 @@ def assemble(disassembly):
if 'argument' in instruction: if 'argument' in instruction:
bytecode += codecs.decode(instruction['argument'], 'hex_codec') bytecode += util.safe_decode(instruction['argument'])
return bytecode return bytecode

@ -13,30 +13,30 @@ class ETHContract(persistent.Persistent):
def get_xrefs(self): def get_xrefs(self):
disassembly = asm.disassemble(util.safe_decode(self.code)) instruction_list = asm.disassemble(util.safe_decode(self.code))
xrefs = [] xrefs = []
for instruction in disassembly: for instruction in instruction_list:
if instruction['opcode'] == "PUSH20": if instruction['opcode'] == "PUSH20":
if instruction['argument']: if instruction['argument']:
addr = instruction['argument'].decode("utf-8") addr = instruction['argument']
if (re.match(r'^[a-zA-Z0-9]{40}$', addr) and addr != "ffffffffffffffffffffffffffffffffffffffff"): if (re.match(r'^0x[a-zA-Z0-9]{40}$', addr) and addr != "0xffffffffffffffffffffffffffffffffffffffff"):
if addr not in xrefs: if addr not in xrefs:
xrefs.append(addr) xrefs.append(addr)
return xrefs return xrefs
def get_disassembly(self): def get_instruction_list(self):
return asm.disassemble(util.safe_decode(self.code)) return asm.disassemble(util.safe_decode(self.code))
def get_easm(self): def get_easm(self):
return asm.disassembly_to_easm(asm.disassemble(util.safe_decode(self.code))) return asm.instruction_list_to_easm(asm.disassemble(util.safe_decode(self.code)))
def matches_expression(self, expression): def matches_expression(self, expression):

@ -7,15 +7,15 @@ class ETHContractTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.code = "0x60606040525b603c5b60006010603e565b9050593681016040523660008237602060003683856040603f5a0204f41560545760206000f35bfe5b50565b005b73c3b2ae46792547a96b9f84405e36d0e07edcd05c5b905600a165627a7a7230582062a884f947232ada573f95940cce9c8bfb7e4e14e21df5af4e884941afb55e590029" self.code = "0x60606040525b603c5b60006010603e565b9050593681016040523660008237602060003683856040603f5a0204f41560545760206000f35bfe5b50565b005b73c3b2ae46792547a96b9f84405e36d0e07edcd05c5b905600a165627a7a7230582062a884f947232ada573f95940cce9c8bfb7e4e14e21df5af4e884941afb55e590029"
class GetDisassemblyTestCase(ETHContractTestCase): class Getinstruction_listTestCase(ETHContractTestCase):
def runTest(self): def runTest(self):
contract = ETHContract(self.code) contract = ETHContract(self.code)
disassembly = contract.get_disassembly() instruction_list = contract.get_instruction_list()
self.assertEqual(len(disassembly), 71, 'Error disassembling code using ETHContract.get_disassembly()') self.assertEqual(len(instruction_list), 71, 'Error disassembling code using ETHContract.get_instruction_list()')
class GetEASMTestCase(ETHContractTestCase): class GetEASMTestCase(ETHContractTestCase):
@ -23,9 +23,9 @@ class GetEASMTestCase(ETHContractTestCase):
contract = ETHContract(self.code) contract = ETHContract(self.code)
disassembly = contract.get_easm() instruction_list = contract.get_easm()
self.assertTrue("PUSH1 0x60" in disassembly,'Error obtaining EASM code through ETHContract.get_easm()') self.assertTrue("PUSH1 0x60" in instruction_list,'Error obtaining EASM code through ETHContract.get_easm()')
class MatchesExpressionTestCase(ETHContractTestCase): class MatchesExpressionTestCase(ETHContractTestCase):
@ -44,4 +44,4 @@ class GetXrefsTestCase(ETHContractTestCase):
xrefs = contract.get_xrefs() xrefs = contract.get_xrefs()
self.assertEqual(xrefs[0], "c3b2ae46792547a96b9f84405e36d0e07edcd05c", 'Error getting xrefs from contract') self.assertEqual(xrefs[0], "0xc3b2ae46792547a96b9f84405e36d0e07edcd05c", 'Error getting xrefs from contract')
Loading…
Cancel
Save