Threat Intel - Building A Simple Botnet Tracker
Introduction to threat intel
Corporate Security
In a mature corporate security program there needs to be a way for the program to track success, and plan for the future. This is where threat intel plays the largest role. The threat intel product can both provide a picture of the current and emerging threats faced by an organization, as well as provide operational support for their security controls (data feeds for blocklists etc.)
Corporate Consumption of The Intel Product
The corporate value proposition for threat intel is simple; we give you a picture of the threats you face and how they operate so you can protect yourself. In practice the intel product itself takes many forms each oriented towards a different consumer within the organization.
Intel Production Process
The threat intel production process can be visualized as a funnel with raw data consumed at the opening of the funnel and finished intel product produced at the narrow end. In practice the customer requirements usually drive the finished intelligence product while the internal intel process may drive the raw data collection.
Finished Intelligence
With each step in the in intelligence production pipeline the information is refined and enriched to provide a more informed and compelling picture of the current threat landscape. Technical reports produced by reverse engineers at the technical analysis layer may be producing product that is complete enough to be directly consumed by technical functions within the customer organization.Intelligence analysts sit at the narrow end of the intelligence funnel and are not necessarily technical. The final product from an intelligence analyst can often by summarized for briefing at the C-level of the customer organization.
Operational Intelligence
With the emergence of the detection engineering role (both within the customer organization as well as within the intel production pipeline) there is also a secondary funnel. The primary role of the detection engineer within the intel pipeline is to produce raw intelligence (one step above data) that can be machine consumable for security controls (rules, IOCs, etc.) The primary role of the detection engineer within the customer organization is to consume this raw intelligence and ensure it is fed into their security controls. This hybrid role forms a synergy (cringe) between the intel production pipeline and the security controls products (EDR, FIREWALL, etc.) Sometimes this secondary funnel is referred to as operational intelligence.
Simple DbatLoader Tracker
Our tracker will be responsible for pulling down the payloads deployed by dbatloader. Each loader sample contains a download URL which can be used to download a unique (I think?) payload. The payloads are encrypted with a simple format that we reverse engineered on a past stream.
The Tracker Architecture
- Use the UnpacMe feed to pull all new dbatloader ULRs
- Download the payloads
- Decrypt them and ???
TODO
import os
api_key = os.environ.get('API_KEY')
import requests
import hashlib
sample_id = 'c37e0dc8-934f-4f61-b3c9-9cdbc4ca6be5'
def get_c2_from_sample(sample_id):
url = f"https://api.unpac.me/api/v1/private/results/{sample_id}"
headers = {"Authorization":api_key}
response = requests.get(url, headers=headers)
response_json = response.json()
c2s = []
for result in response_json.get('results',[]):
if 'config' in result:
config = result.get('config',{}).get('config',{})
for c2 in config.get('c2s',[]):
if c2.get('type',None) == 'url':
url = c2.get('value',None)
if url is not None:
c2s.append(url)
return list(set(c2s))
get_c2_from_sample(sample_id)
def get_c2s_from_feed():
c2s = []
url = 'https://api.unpac.me/api/v1/private/feed/unpacked/yara/DbatLoaderStage1'
headers = {"Authorization":api_key}
response = requests.get(url, headers=headers)
response_json = response.json()
configs = []
for sample in response_json.get('submissions',[]):
if sample.get('configs',False):
# Get the config
sample_sha256 = sample.get('submission_sha256',None)
sample_id = sample.get('id',None)
if sample_id is not None:
for c2 in get_c2_from_sample(sample_id):
yield sample_sha256,c2
list(get_c2s_from_feed())
c2 = 'https://onedrive.live.com/download?cid=EE3CB851BBF42204&resid=EE3CB851BBF42204%21117&authkey=AP6g5cIxaUzrxIM'
def get_payload(c2):
out_data = None
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36 Edg/106.0.1370.34'}
response = requests.get(c2, headers=headers)
if response.ok:
out_data = response.content
return out_data
def decrypt_payload(data, key, delim):
out = []
for c in data:
if c & 1 != 0:
out.append((c + key) & 0xff)
else:
out.append((c - key) & 0xff)
out = bytes(out)
out = out[::-1]
return out.split(delim)
def addit(data, key):
out = []
for c in data:
out.append((c + (0x112 % key))&0xff)
return bytes(out)
def decrypt_yak(data):
"""
implements the first decryption layer of function 0x416408
"""
res = bytearray(data)
for i, c in enumerate(data):
if 0x21 <= c <= 0x7e:
res[i] = ((((c + 0xe) % 0x5e) + 0x21) & 0xff)
return bytes(res)
def decrypt_payload_section(section_data, main_key, section_key):
out = []
key_len = len(section_key)
section_data_len = len(section_data)
key_count = 0
for i in range(section_data_len):
tmp_byte = (section_data[i] ^ section_data_len ) & 0xff
out.append((section_key[key_count] ^ key_len ^ tmp_byte ) & 0xff)
key_count = (key_count + 1) % key_len
payload_out = bytes(out)
payload_out_dec = addit(payload_out, main_key)
payload_out_dec = payload_out_dec[::-1]
return decrypt_yak(payload_out_dec)
def decrypt_download(data, key, delim):
out_sections = decrypt_payload(data, key, delim)
if len(out_sections) <= 4:
print("Not enough sections decrypted")
return None
section_key = out_sections[1]
section_data = out_sections[3]
return decrypt_payload_section(section_data, key, section_key)
key = 217
delim = b'*()%@5YT!@#G__T@#$%^&*()__#@$#57$#!@'
out_dir = '/tmp'
for sample_sha256,c2 in get_c2s_from_feed():
# Lol fix url format
c2 = c2.strip(" ")
print(f"Downloading payload for {sample_sha256}")
print(f"\tURL: {c2}")
payload = get_payload(c2)
if payload is None:
print("\tURL is dead")
continue
if payload.find(b'</html>') != -1:
print("\tPayload removed")
continue
final_payload = decrypt_download(payload, key, delim)
payload_hash = hashlib.sha256(final_payload).hexdigest()
payload_path = f"{out_dir}/{payload_hash}.bin"
print(f"\tDropping pyload to {payload_path}")
with open(payload_path,'wb') as fp:
fp.write(final_payload)