Angr Control Flow Deobfuscation
Learning about Angr for deobfuscation
- Overview
- The Concept
- Initial Analysis
- An Emulation Approach (For Comparison)
- A Symbolic Execution Approach
- Complete Solution from @mrexodia
- CFF State Machine Analysis
- Complete Solution
- Build Patching Script in IDA Python
- TODO
Overview
Today we are going to learn a bit about how to use angr for control flow deobfuscation. We are using the Pandora Ransomware sample as an example.
Sample (packed)
5b56c5d86347e164c6e571c86dbf5b1535eae6b979fede6ed66b01e79ea33b7b
Unpacked sample (we will be using this as our example)
2619862c382d3e375f13f3859c6ab44db1a4bce905b4a617df2390fbf36902e7
The Concept
We have a binary that has been obfuscated using control flow flattening (CFF). The original control flow (CF) of each function has been replaced by a state machine. Each of the original code basic blocks (BB) has been given a state. To transition between the basic blocks a dispatcher is used which routes the control flow based on the current state.
Before CFF
After CFF
Removing CFF
The theory behind removing CFF is to identify the state of each original BB, and the "next" state(s) that each original BB along with the condition that sets each of these states.
Basic Block State The BB state can be thought of as -- what state must be put INTO the dispatcher to route to the BB.
Next State The "next" state depends on whether the BB has conditional control flow. If there is no condition then there will only be ONE next state. However, if there is conditional control flow there will be TWO next states with a dependency on some condition within the BB.
In practice determining theses states and next states can be difficult as the dispatcher may be complex and the conditions used to set the next state may also be complex.
Initial Analysis
Before we can begin analyzing the CFF state machine we need to extract the following initial information from the function.
- The function entrypoint (this is assumed)
- The address of the DISPATCHER start
- It may be possible to determine this heuristically as many bb will jmp to this address
- The STATE variable
- The STATE variable is the variable used to pass the STATE to the DISPATCHER. In simple CFF cases the same variable is used throughout the original code blocks but in more complex CFF this cannot be relied on.
- The address of each of the original code basic blocks
- This may not be needed initially depending on the strategy. It maybe be possible to recover these during symbolic execution.
An Emulation Approach (For Comparison)
To associate a state with each original code block we can first collect a list of states using some simple assembly pattern matching in the original code blocks (the state will be moved to EAX). Then start emulation at the dispatcher start and run an emulation for each state stopping on the first jmp to an original code block -- this is the state for that block. Then once we have the states associate with each block we can manually connect up the original basic blocks and remove the dispatcher.
This approach has many drawbacks, but the main issues are finding the states using pattern matching. Depending on how the states are set it may not be as simple as collecting all of the states by looking for a mov eax, immediate. Also, once the states are assigned to the basic blocks determining what conditions trigger which next state for each bb may also prove complex and very brittle (less re-usable code)
A Symbolic Execution Approach
With symbolic execution we are not limited to the constraints of the variables, but can "symbolically execute" over all possible paths between the dispatcher and the original basic blocks. This allows us to test all possible options for the STATE variable. T
Associating A STATE With Each Original Basic Block
The first step is to associate a STATE with a an original basic block (OBB). We have an advantage with the state machine in that we know the initial state (it muste either be set in the entrpoint or passed into the funcation as an argument). Once we know the initial state we can use this to execute until we reach an OBB. We can now associated this state with the OBB
Determining Next STATE(S)
Once we have reached the start of an OBB we can symbolically execute until we reach the dispatcher again. Once we reach the dispatcher we can query the symbolic equation for possible value(s) of the STATE. There may be multiple values depending on the conditional logic in the OBB. These STATE values are the next states and will allow us to further interrogate the function to associated with with more OBBs.
CFF State Machine Parsing Algorithm
The full algorithm for parsing the CFF state machine is described below.
- Use initial STATE and execute from the start of the DISPATCHER until reaching an OBB
- Assocated this STATE with the OBB
- Continue executing until reaching the DISPATCHER
- Solve the symbolic equation for the STATE(S) to determine the NEXT-STATE(s) of the OBB
- Associate these NEXT-STATE(s) with the OBB (STATE -> OBB -> NEXT-STATE)
- Repeate the process for each NEXT-STATE treatin each one as the new STATE
References
- angr docs
- angr cheat sheet
- z3
- OST2 - Reverse Engineering 3201: Symbolic Analysis
- Unpacked Pandora sample
2619862c382d3e375f13f3859c6ab44db1a4bce905b4a617df2390fbf36902e7
- Control Flow Flattening
- Deobfuscation - Recovering an ollvm
- stadeo deobfuscation tool
- Control Flow Unflattening
- Analysis of Virtualization-based Obfuscation (r2con2021workshop)
Complete Solution from @mrexodia
In this approach less initial information is needed about the binary. Also this approach will work on the un-patched binary (something I didn't realize was possible with angre). One of the advantage of Angr is that it is able to read and interact with the hard-coded jmp table without additional hooks/patches.
All creadit for this method goes to mrexodia who developed the original code and proof of concept for this approach.
import angr
proj = angr.Project("/tmp/pandora_dump_SCY.bin", load_options={'auto_load_libs': False})
def get_dispatcher_state(function, dispatcher):
state = proj.factory.call_state(addr=function)
# Ignore function calls
# https://github.com/angr/angr/issues/723
state.options.add(angr.options.CALLLESS)
simgr = proj.factory.simulation_manager(state)
# Find the dispatcher
while True:
simgr.step()
assert len(simgr.active) == 1
state = simgr.active[0]
if state.addr == dispatcher:
return state.copy()
addr_main = 0x7FF6C4B066F0
addr_dispatcher = 0x7ff6c4b067f0
dispatcher_state = get_dispatcher_state(function=addr_main, dispatcher=addr_dispatcher)
print(f"Dispatcher state: {dispatcher_state}")
initial_state = dispatcher_state.solver.eval_one(dispatcher_state.regs.eax)
print(f"Initial eax: {hex(initial_state)}")
# %%
def find_successors(state_value, dispatcher):
state = dispatcher_state.copy()
state.regs.eax = state.solver.BVV(state_value, 32)
simgr = proj.factory.simulation_manager(state)
while True:
print(f"eax: {simgr.active[0].regs.eax}")
print(f"Stepping: {simgr.active} ...")
simgr.step()
# TODO: the block before the dispatcher is wrong, we need the first non-dispatcher block
if len(simgr.active) == 0:
return state, []
assert len(simgr.active) == 1
state2 = simgr.active[0]
print(f" Only found a single sucessor: {hex(state2.addr)}")
if state2.addr == dispatcher:
print(f" Dispatcher, eax: {state2.regs.eax}")
# TODO: figure out where these potential values are set
solutions = state2.solver.eval_upto(state2.regs.eax, 2) # TODO: might need more potential states
return state, solutions
elif state2.addr == 0x7ff6c4b070ea: # HACK: int3 here, no idea how to properly handle it
return state, []
state = state2
from queue import Queue
# state_value => real basic block state
states = {}
q = Queue()
q.put(initial_state)
while not q.empty():
state_value = q.get()
# Skip visited states
if state_value in states:
continue
bb_state, successors = find_successors(state_value, addr_dispatcher)
print(f"{hex(state_value)} {bb_state} => {[hex(n) for n in successors]}")
print()
states[state_value] = bb_state, successors
for state_value in successors:
q.put(state_value)
dot = "digraph CFG {\n"
for state_value in states.keys():
_, succ = states[state_value]
for s in succ:
dot += f"\"{hex(state_value)}\" -> \"{hex(s)}\"\n"
dot += "}"
print(dot)
import angr
import claripy
import struct
BINARY_PATH = '/tmp/pandora_dump_SCY.bin'
entry_point = 0x00007FF6C4B066F0
dispatcher_start = 0x00007FF6C4B067F0
project = angr.Project(BINARY_PATH, load_options={'auto_load_libs': False})
# TODO: We should explicately add the state since we know it (main)
initial_state = project.factory.call_state(addr=entry_point)
# Use this setting to skip calls instead of a hook
initial_state.options.add(angr.options.CALLLESS)
simgr = project.factory.simgr(initial_state)
simgr.active
simgr.run(until=lambda s: s.active[0].addr == dispatcher_start)
print(simgr.active)
initial_dispatcher_state = simgr.active[0]
eax_initial = initial_dispatcher_state.solver.eval_one(initial_dispatcher_state.regs.eax)
print(f"Initial state address {initial_dispatcher_state}")
print(f"Initial eax: {hex(eax_initial)}")
# Save the initial state at the dispatcher
initial_dispatcher_state = simgr.active[0]
class Flags:
def __init__(self, register):
self.CF = False
self.PF = False
self.AF = False
self.ZF = False
self.SF = False
self.TF = False
self.IF = False
self.DF = False
self.OF = False
if register & 0x0001 == 0x0001:
self.CF = True
if register & 0x0004 == 0x0004:
self.PF = True
if register & 0x0010 == 0x0010:
self.AF = True
if register & 0x0040 == 0x0040:
self.ZF = True
if register & 0x0080 == 0x0080:
self.SF = True
if register & 0x0100 == 0x0100:
self.TF = True
if register & 0x0200 == 0x0200:
self.IF = True
if register & 0x0400 == 0x0400:
self.DF = True
if register & 0x0800 == 0x0800:
self.OF = True
state = initial_dispatcher_state.copy()
# Original code bb addresses
orig_code_bb = [0x7ff6c4b0687f, 0x7ff6c4b0691a, 0x7ff6c4b06a04, 0x7ff6c4b06a84, 0x7ff6c4b06ad1, 0x7ff6c4b06b3b, 0x7ff6c4b06b9b, 0x7ff6c4b06bf3, 0x7ff6c4b06c61, 0x7ff6c4b06cfd, 0x7ff6c4b06e9f, 0x7ff6c4b06ef8, 0x7ff6c4b06f58, 0x7ff6c4b07024]
ret_bb = 0x7FF6C4B0706E
orig_code_bb.append(ret_bb)
# Set symbol for state register eax (32 bit)
# eax_state = claripy.BVS('eax_state', 4*8)
# state.memory.store(state.regs.eax, eax_state)
end_bb = 0x7FF6C4B0706E
orig_bb_1_start = 0x00007FF6C4B0691A
detatched_bb = 0x00007FF6C4B070D1
interrupt_bb = 0x00007FF6C4B070EA
# Run from initial state until we hit an original code block
simgr_dispatcher = project.factory.simgr(state)
while True:
simgr_dispatcher.step()
print(simgr_dispatcher)
if len(simgr_dispatcher.active) != 1:
print(f"Multiple states!")
break
if simgr_dispatcher.active[0].addr in orig_code_bb:
print(f"Found original code block")
break
print(f"Step: {hex(simgr_dispatcher.active[0].addr)}")
print(simgr_dispatcher)
state_1_end = simgr_dispatcher.active[0]
state_1_eax = eax_initial
state_1_bb = state_1_end.addr
print(f"state_1_eax:{hex(state_1_eax)} -> bb: {hex(state_1_bb)}")
# Run until dispatcher to get new state(s)
while True:
simgr_dispatcher.step()
print(simgr_dispatcher)
if len(simgr_dispatcher.active) != 1:
print(f"Multiple states!")
break
if simgr_dispatcher.active[0].addr == dispatcher_start:
print(f"Found dispatcher")
break
print(f"Step: {hex(simgr_dispatcher.active[0].addr)}")
# This is the start of state 2
state_2 = simgr_dispatcher.active[0]
# Retrieve all potential eax values
state_2_eax_values = state_2.solver.eval_upto(state_2.regs.eax, 8)
print(f"Number of possible state values: {len(state_2_eax_values)}")
# If there is more than one next state that means our code block will require a conditional jmp to replace
# the state machine, we will need to find end process both next states as well as determine what conditions
# cause each state
for state_2_eax_value in state_2_eax_values:
print(f"state_2_eax:{hex(state_2_eax_value)}")
# For each eax state we are going to need the associated flags to eventually replace the conditional
# cmov with a conditional jmp
flags_values = state_2.solver.eval_upto(state_2.regs.flags, 2, extra_constraints=[state_2.regs.eax == state_2_eax_value])
if len(flags_values) == 1:
# This is the constrained state we must save these flags with the state
print(f"Constrained state: {hex(state_2_eax_value)}")
flags = flags_values[0]
print(f"flags:{hex(flags)}")
f = Flags(flags)
print(f"CF: {f.CF}")
print(f"PF: {f.PF}")
print(f"AF: {f.AF}")
print(f"ZF: {f.ZF}")
print(f"SF: {f.SF}")
print(f"TF: {f.TF}")
print(f"IF: {f.IF}")
print(f"DF: {f.DF}")
print(f"OF: {f.OF}")
else:
# This is the unconstrained state, we don't need to save anything
print(f"Unconstrained state: {hex(state_2_eax_value)}")
# and process both eax states seperately to link each one with their basic block
state_2_1 = state_2.copy()
state_2_1_eax = state_2_eax_values[0]
# Set eax back to a concrete value choosing state_2_1
state_2_1.regs.eax = state.solver.BVV(state_2_1_eax, 32)
state_2_2 = state_2.copy()
state_2_2_eax = state_2_eax_values[1]
# Set eax back to a concrete value choosing state_2_2
state_2_2.regs.eax = state.solver.BVV(state_2_2_eax, 32)
# Solve: state_2_1
########################
simgr_dispatcher_2_1 = project.factory.simgr(state_2_1)
stop_flag = False
while True:
simgr_dispatcher_2_1.step()
print(simgr_dispatcher_2_1)
if len(simgr_dispatcher_2_1.active) == 0:
print(f"Dead State!")
stop_flag = True
break
if len(simgr_dispatcher_2_1.active) > 1:
print(f"Multiple states: {len(simgr_dispatcher_2_1.active)}!")
stop_flag = True
break
if simgr_dispatcher_2_1.active[0].addr == detatched_bb:
print(f"Found detached block: {hex(simgr_dispatcher_2_1.active[0].addr)}!")
stop_flag = True
break
if simgr_dispatcher_2_1.active[0].addr in orig_code_bb:
print(f"Found original code block")
break
print(f"Step: {hex(simgr_dispatcher_2_1.active[0].addr)}")
if not stop_flag:
state_2_1_end = simgr_dispatcher_2_1.active[0]
state_2_1_bb = state_2_1_end.addr
print(f"state_1_eax:{hex(state_2_1_eax)} -> bb: {hex(state_2_1_bb)}")
print(f"** Find next state(s)")
# This is the start of state 3
state_3 = simgr_dispatcher_2_1.active[0]
# Retrieve all potential eax values
state_3_eax_values = state_3.solver.eval_upto(state_3.regs.eax, 8)
print(f"Number of possible state values: {len(state_3_eax_values)}")
# If there is more than one next state that means our code block will require a conditional jmp to replace
# the state machine, we will need to find end process both next states as well as determine what conditions
# cause each state
if len(state_3_eax_values) == 1:
state_3_eax_value = state_3_eax_values[0]
print(f"Unconditonal JMP - State eax: {hex(state_3_eax_value)}")
else:
for state_3_eax_value in state_3_eax_values:
print(f"state_3_eax:{hex(state_3_eax_value)}")
# For each eax state we are going to need the associated flags to eventually replace the conditional
# cmov with a conditional jmp
flags_values = state_3.solver.eval_upto(state_3.regs.flags, 2, extra_constraints=[state_3.regs.eax == state_3_eax_value])
if len(flags_values) == 1:
# This is the constrained state we must save these flags with the state
print(f"Constrained state: {hex(state_3_eax_value)}")
flags = flags_values[0]
print(f"flags:{hex(flags)}")
f = Flags(flags)
print(f"CF: {f.CF}")
print(f"PF: {f.PF}")
print(f"AF: {f.AF}")
print(f"ZF: {f.ZF}")
print(f"SF: {f.SF}")
print(f"TF: {f.TF}")
print(f"IF: {f.IF}")
print(f"DF: {f.DF}")
print(f"OF: {f.OF}")
else:
# This is the unconstrained state, we don't need to save anything
print(f"Unconstrained state: {hex(state_3_eax_value)}")
# Solve: state_2_2 (good one to copy)
#####################################
simgr_dispatcher_2_2 = project.factory.simgr(state_2_2)
stop_flag = False
while True:
simgr_dispatcher_2_2.step()
print(simgr_dispatcher_2_2)
if len(simgr_dispatcher_2_2.active) == 0:
print(f"Dead State!")
stop_flag = True
break
if len(simgr_dispatcher_2_2.active) > 1:
print(f"Multiple states: {len(simgr_dispatcher_2_2.active)}!")
stop_flag = True
break
if simgr_dispatcher_2_2.active[0].addr == detatched_bb:
print(f"Found detached block: {hex(simgr_dispatcher_2_2.active[0].addr)}!")
stop_flag = True
break
if simgr_dispatcher_2_2.active[0].addr in orig_code_bb:
print(f"Found original code block")
break
print(f"Step: {hex(simgr_dispatcher_2_2.active[0].addr)}")
if stop_flag:
print(f"Stopped on bb: {hex(state_2_2_eax)}")
else:
state_2_2_end = simgr_dispatcher_2_2.active[0]
state_2_2_bb = state_2_2_end.addr
print(f"state_1_eax:{hex(state_2_2_eax)} -> bb: {hex(state_2_2_bb)}")
print(f"** Find next state(s)")
# Run until dispatcher to get new state(s)
ob_dead_flag = False
while True:
simgr_dispatcher_2_2.step()
print(simgr_dispatcher_2_2)
if len(simgr_dispatcher_2_2.active) == 0:
print(f"Dead end!")
ob_dead_flag = True
break
if len(simgr_dispatcher_2_2.active) > 1:
print(f"Multiple states in original bb {len(simgr_dispatcher_2_2.active)}!")
ob_dead_flag = True
break
if simgr_dispatcher_2_2.active[0].addr == dispatcher_start:
print(f"Found dispatcher")
break
print(f"Step: {hex(simgr_dispatcher.active[0].addr)}")
if not ob_dead_flag:
# This is the start of state 3
state_3_2 = simgr_dispatcher_2_2.active[0]
# Retrieve all potential eax values
state_3_2_eax_values = state_3_2.solver.eval_upto(state_3_2.regs.eax, 8)
print(f"Number of possible state values: {len(state_3_2_eax_values)}")
# If there is more than one next state that means our code block will require a conditional jmp to replace
# the state machine, we will need to find end process both next states as well as determine what conditions
# cause each state
if len(state_3_2_eax_values) == 1:
state_3_2_eax_value = state_3_2_eax_values[0]
print(f"Unconditonal JMP - State eax: {hex(state_3_2_eax_value)}")
else:
for state_3_2_eax_value in state_3_2_eax_values:
print(f"state_3_2_eax:{hex(state_3_2_eax_value)}")
# For each eax state we are going to need the associated flags to eventually replace the conditional
# cmov with a conditional jmp
flags_values = state_3_2.solver.eval_upto(state_3_2.regs.flags, 2, extra_constraints=[state_3_2.regs.eax == state_3_2_eax_value])
if len(flags_values) == 1:
# This is the constrained state we must save these flags with the state
print(f"Constrained state: {hex(state_3_2_eax_value)}")
flags = flags_values[0]
print(f"flags:{hex(flags)}")
f = Flags(flags)
print(f"CF: {f.CF}")
print(f"PF: {f.PF}")
print(f"AF: {f.AF}")
print(f"ZF: {f.ZF}")
print(f"SF: {f.SF}")
print(f"TF: {f.TF}")
print(f"IF: {f.IF}")
print(f"DF: {f.DF}")
print(f"OF: {f.OF}")
else:
# This is the unconstrained state, we don't need to save anything
print(f"Unconstrained state: {hex(state_3_2_eax_value)}")
import angr
import claripy
import struct
from queue import Queue
BINARY_PATH = '/tmp/pandora_dump_SCY.bin'
# Information about the function which we can get from IDA
entry_point = 0x00007FF6C4B066F0
dispatcher_start = 0x00007FF6C4B067F0
# Original code bb addresses
orig_code_bb = [0x7ff6c4b0687f, 0x7ff6c4b0691a, 0x7ff6c4b06a04, 0x7ff6c4b06a84, 0x7ff6c4b06ad1, 0x7ff6c4b06b3b, 0x7ff6c4b06b9b, 0x7ff6c4b06bf3, 0x7ff6c4b06c61, 0x7ff6c4b06cfd, 0x7ff6c4b06e9f, 0x7ff6c4b06ef8, 0x7ff6c4b06f58, 0x7ff6c4b07024]
ret_bb = 0x7FF6C4B0706E
orig_code_bb.append(ret_bb)
detatched_bb = 0x00007FF6C4B070D1
interrupt_bb = 0x00007FF6C4B070EA
project = angr.Project(BINARY_PATH, load_options={'auto_load_libs': False})
# Start at function entrypoint
initial_state = project.factory.call_state(addr=entry_point)
# Use this setting to skip calls instead of a hook
initial_state.options.add(angr.options.CALLLESS)
simgr = project.factory.simgr(initial_state)
# Explore to dispatcher start
simgr.run(until=lambda s: s.active[0].addr == dispatcher_start)
print(simgr.active)
# Save the initial state at the dispatcher
initial_dispatcher_state = simgr.active[0].copy()
# Get initial state for eax
eax_initial = initial_dispatcher_state.solver.eval_one(initial_dispatcher_state.regs.eax)
print(f"Initial state address {initial_dispatcher_state}")
print(f"Initial eax: {hex(eax_initial)}")
def find_obb(eax_state):
#
# Returns the state, and the address of the obb (state,obb_address)
# ** Will return NONE for obb unreachable (caller must handle this)
# ** Will detached bb address if detached block found (caller must handle this)
#
return_state = None
obb_address = None
# Make a copy of the state so we can restore the original if we need
state = initial_dispatcher_state.copy()
# Set the eax value for the state
state.regs.eax = state.solver.BVV(eax_state, 32)
# Build a sim manager for our state
simgr = project.factory.simgr(state)
# Add danger limit for our loop
danger_limit = 0
# Step until we hit a obb or we can't reach an obb
while danger_limit <= 10000:
danger_limit += 1
simgr.step()
if len(simgr.active) == 0:
#print(f"Dead State!")
# Return nothing for dead state
return None,None
if len(simgr.active) > 1:
#print(f"Multiple states: {len(simgr.active)}!")
# Return nothing for branched state -- this shouldn't happen
return None,None
if simgr.active[0].addr == detatched_bb:
#print(f"Found detached block: {hex(simgr.active[0].addr)}!")
# Return the
return_state = None
obb_address = simgr.active[0].addr
return return_state, obb_address
if simgr.active[0].addr in orig_code_bb:
#print(f"Found original code block")
return simgr.active[0].copy(),simgr.active[0].addr
print(f"Step: {hex(simgr.active[0].addr)}")
def find_dispatcher(current_state):
#
# Returns the next state values and associated flags [(state_value,flags)]
# ** if the state is unconstrained the flags will be NONE
# ** if the dispatcher is unreachable we return an empty set []
state_values = []
# Make a copy of the state so we can restore the original if we need
state = current_state.copy()
# Build sim manager for state
simgr = project.factory.simgr(state)
# Add danger limit for our loop
danger_limit = 0
# Step until we hit dispatcher
while danger_limit <= 10000:
danger_limit += 1
simgr.step()
if len(simgr.active) == 0:
#print(f"Dead end!")
return []
if len(simgr.active) > 1:
#print(f"Multiple states in original bb {len(simgr.active)}!")
return []
if simgr.active[0].addr == dispatcher_start:
#print(f"Found dispatcher")
break
print(f"Step: {hex(simgr.active[0].addr)}")
# If we got here we have some next states for the obb let's solve for them
current_state= simgr.active[0]
# Retrieve all potential eax values
eax_values = current_state.solver.eval_upto(current_state.regs.eax, 8)
#print(f"Number of possible state values: {len(eax_values)}")
# If there is more than one next state that means our code block will require a conditional jmp to replace
# the state machine, we will need to find end process both next states as well as determine what conditions
# cause each state
if len(eax_values) == 1:
eax_value = eax_values[0]
#print(f"Unconditonal JMP - State eax: {hex(eax_value)}")
state_values.append((eax_value,None))
else:
for eax_value in eax_values:
#print(f"eax_value:{hex(eax_value)}")
# For each eax state we are going to need the associated flags to eventually replace the conditional
# cmov with a conditional jmp
flags_values = current_state.solver.eval_upto(current_state.regs.flags, 2, extra_constraints=[current_state.regs.eax == eax_value])
# if len(flags_values) == 1:
# This is the constrained state we must save these flags with the state
#print(f"Constrained state: {hex(eax_value)}")
flags = flags_values[0]
state_values.append((eax_value,flags))
# else:
# # This is the unconstrained state, we don't need to save anything
# #print(f"Unconstrained state: {hex(eax_value)}")
# state_values.append((eax_value,None))
return state_values
def get_state_info(state_value):
# (obb_address, [(state,flags)])
obb_state,obb_address = find_obb(state_value)
# Check if we hit a dead state
if obb_address == None:
print(f"Dead state: {hex(state_value)}")
return None
# Check if we a detached bb
elif obb_address == detatched_bb:
print(f"Detached BB: State:{hex(state_value)} -> bb: {hex(obb_address)}")
return (obb_address, [])
else:
print(f"State:{hex(state_value)} -> bb: {hex(obb_address)}")
# Find next states
next_states = find_dispatcher(obb_state)
# Check if end code block (no path to dispatcher)
if len(next_states) == 0:
print(f"{hex(obb_address)} is end state!")
return (obb_address, [])
# Check if this is an unconditional jmp
elif len(next_states) == 1:
print(f"{hex(obb_address)} -> jmp state:{hex(next_states[0][0])}")
return (obb_address, next_states)
# If there are multiple states print the conditions
else:
for next_state in next_states:
if next_state[1] == None:
# This is unconditional jmp
print(f"{hex(obb_address)} -> jmp state:{hex(next_state[0])}")
else:
# This is a conditional jmp
print(f"{hex(obb_address)} -> conditional jmp state:{hex(next_state[0])}")
return (obb_address, next_states)
# state_table[state] = (obb_address, [(state,flags)])
state_table = {}
q = Queue()
q.put(eax_initial)
while not q.empty():
state_value = q.get()
state_info = get_state_info(state_value)
if state_info is not None:
state_table[state_value] = state_info
# If we have a new state add it to the queue
for next_state in state_info[1]:
next_state_value = next_state[0]
if next_state_value not in state_table:
q.put(next_state_value)
state_table
Build Patching Script in IDA Python
For each obb look up the state_info
- if not conditional jmp then patch last bytes in obb with jmp -> state (lookup state address)
- if conditional then read in the last few bytes of the obb and try to determin the condition
- compare the condition with the saved flags for each state
- add one conditional jmp based on the flags
- add one unconditional jmp for the other state
class Flags:
def __init__(self, register):
self.CF = False
self.PF = False
self.AF = False
self.ZF = False
self.SF = False
self.TF = False
self.IF = False
self.DF = False
self.OF = False
if register & 0x0001 == 0x0001:
self.CF = True
if register & 0x0004 == 0x0004:
self.PF = True
if register & 0x0010 == 0x0010:
self.AF = True
if register & 0x0040 == 0x0040:
self.ZF = True
if register & 0x0080 == 0x0080:
self.SF = True
if register & 0x0100 == 0x0100:
self.TF = True
if register & 0x0200 == 0x0200:
self.IF = True
if register & 0x0400 == 0x0400:
self.DF = True
if register & 0x0800 == 0x0800:
self.OF = True
state_table = {2361132084: (140697838577791, [(2107475699, 0), (389785057, 68)]),
2107475699: (140697838579921, []),
389785057: (140697838578941, [(2056782390, None)]),
2056782390: (140697838579448, [(280783992, 149), (2720667311, 148)]),
280783992: (140697838577946, [(2056782390, None)]),
2720667311: (140697838578785, [(1020697904, 0), (2104599011, 68)]),
1020697904: (140697838579748, [(3272322606, None)]),
2104599011: (140697838578675, [(2727945767, None)]),
3272322606: (140697838578180, [(1814337361, 148), (992709150, 149)]),
2727945767: (140697838578385, [(3972960241, None)]),
1814337361: (140697838578308, [(2104599011, None)]),
992709150: (140697838578491, [(3560944452, 68), (3230979785, 0)]),
3972960241: (140697838579822, []),
3560944452: (140697838578587, [(3272322606, None)]),
3230979785: (140697838579544, [(3560944452, None)])}
orig_code_bb = [0x7ff6c4b0687f, 0x7ff6c4b0691a, 0x7ff6c4b06a04, 0x7ff6c4b06a84, 0x7ff6c4b06ad1, 0x7ff6c4b06b3b, 0x7ff6c4b06b9b, 0x7ff6c4b06bf3, 0x7ff6c4b06c61, 0x7ff6c4b06cfd, 0x7ff6c4b06e9f, 0x7ff6c4b06ef8, 0x7ff6c4b06f58, 0x7ff6c4b07024]
entry_point = 0x00007FF6C4B066F0
ret_bb = 0x7FF6C4B0706E
detatched_bb = 0x00007FF6C4B070D1
# Add these blocks so we test them as well
orig_code_bb.append(entry_point)
orig_code_bb.append(ret_bb)
orig_code_bb.append(detatched_bb)
# we also need to fake an initial block so that the entrypoint has some state info
# We know that it jumps to the initial state so just add it manually
# Initial eax: 0x8cbc0434
state_table[0xffff] = (0x00007FF6C4B066F0, [0x8cbc0434, None])
for obb_addr in orig_code_bb:
for state in state_table:
state_info = state_table[state]
if obb_addr == state_info[0]:
# Found the obb info
if len(state_info[1]) == 0:
# This is an end state do nothing
break
elif len(state_info[1]) == 1:
# Unconditional jmp
# Determine next_obb address from next_state
next_state = state_info[1][0][0]
next_obb_address = state_table[next_state][0]
# Iterate through obb until we hit the jmp
# replace it with the new state obb address
ptr = obb_addr
while print_insn_mnem(ptr) != 'jmp':
ptr = next_head(ptr)
jmp_rel = next_obb_address - (ptr + 5)
patch_jmp = b'\xe9' + struct.pack('<i',jmp_rel)
#print(f"Unconditional patch {patch_jmp} at {hex(ptr)}")
# Patch bytes
ida_bytes.patch_bytes(ptr,patch_jmp)
break
else:
# Conditional jmp
print(f"obb: {hex(obb_addr)}")
ptr = obb_addr
conditional_jmp_instruction = None
conditional_jmp_address = None
unconditional_jmp_address = None
while print_insn_mnem(ptr) != 'jmp':
instruction = print_insn_mnem(ptr)
if "cmovnz" == instruction:
# Use a JNZ (ZF=0)
# Check the flags for both next states
# Make sure on one flag set statisfies condition
next_state_info_1 = state_info[1][0]
next_state_info_2 = state_info[1][1]
f = Flags(next_state_info_1[1])
conditional_jmp_instruction = b'\x0F\x85'
if not f.ZF:
# Found our conditional state next_state_info_1
conditional_jmp_address = state_table[next_state_info_1[0]][0]
unconditional_jmp_address = state_table[next_state_info_2[0]][0]
else:
# Found our conditional state next_state_info_2
conditional_jmp_address = state_table[next_state_info_2[0]][0]
unconditional_jmp_address = state_table[next_state_info_1[0]][0]
if "cmovb" == instruction:
# Use a JB (CF=1)
# Check the flags for both next states
# Make sure on one flag set statisfies condition
next_state_info_1 = state_info[1][0]
next_state_info_2 = state_info[1][1]
f = Flags(next_state_info_1[1])
conditional_jmp_instruction = b'\x0F\x82'
if f.CF:
# Found our conditional state next_state_info_1
conditional_jmp_address = state_table[next_state_info_1[0]][0]
unconditional_jmp_address = state_table[next_state_info_2[0]][0]
else:
# Found our conditional state next_state_info_2
conditional_jmp_address = state_table[next_state_info_2[0]][0]
unconditional_jmp_address = state_table[next_state_info_1[0]][0]
# Increment to next instruction
ptr = next_head(ptr)
# Build patch for conditional jmp
jmp_rel = conditional_jmp_address - (ptr + 6)
patch_cond_jmp = conditional_jmp_instruction + struct.pack('<i',jmp_rel)
# Patch bytes
#print(f"Conditional patch (cond) {patch_cond_jmp} at {hex(ptr)}")
ida_bytes.patch_bytes(ptr,patch_cond_jmp)
ptr += 6
# Build patch for unconditional jmp
jmp_rel = unconditional_jmp_address - (ptr + 5)
patch_jmp = b'\xe9' + struct.pack('<i',jmp_rel)
# Patch bytes
ida_bytes.patch_bytes(ptr,patch_jmp)
#print(f"Conditional patch (uncon) {patch_jmp} at {hex(ptr)}")
break
TODO
There are a few things we can improve on. First, we seem to have some orphaned obbs once we do the patching. This is likely becuase we didn't test one of the states.
This means that we have some error in our path traversal through the dispatcher. All possibilites should be reachable from the entrypoint (by definition).
Possibly we need to keep the sim manager state for new eax state we test instead of resetting it.