Skip to content

Tools

src.tools.helpers

defang_ioc

defang_ioc(s)

Normalize common defanged indicators in a string. Examples: hxxp -> http, [.] -> ., (.) -> ., {.} -> .

Source code in src/tools/helpers.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def defang_ioc(s: str) -> str:
    """
    Normalize common defanged indicators in a string.
    Examples: hxxp -> http, [.] -> ., (.) -> ., {.} -> .
    """
    try:
        t = s
        t = t.replace("hxxps://", "https://").replace("hxxp://", "http://").replace("hxxp:", "http:")
        t = t.replace("[.]", ".").replace("(.)", ".").replace("{.}", ".").replace("(dot)", ".").replace("[dot]", ".")
        return t
    except Exception:
        return s

src.tools.static_analysis

calculateentropy

calculateentropy(path, head_bytes=None)

File entropy (entire file) or only header bytes (head_bytes).

Source code in src/tools/static_analysis.py
284
285
286
287
288
289
290
291
292
293
def calculateentropy(path: str, head_bytes: Optional[int] = None) -> Dict[str, Any]:
    """
    File entropy (entire file) or only header bytes (head_bytes).
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}
    data = read_file(path)
    if head_bytes and head_bytes > 0:
        data = data[:head_bytes]
    return {"path": os.path.abspath(path), "entropy": entropy(data), "sampled_bytes": len(data)}

detect_anti_analysis

detect_anti_analysis(path)

Anti-debug/Anti-VM/Anti-sandbox via keywords in strings.

Source code in src/tools/static_analysis.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def detect_anti_analysis(path: str) -> Dict[str, Any]:
    """
    Anti-debug/Anti-VM/Anti-sandbox via keywords in strings.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}
    data = read_file(path)
    s = [x.lower() for x in get_ascii_strings(data, min_len=4)][:5000]

    patterns = {
        "Anti-Debug": ["isdebuggerpresent", "checkremotedebuggerpresent", "debugger", "ollydbg", "x64dbg", "ida", "windbg", "ghidra"],
        "Anti-VM": ["vmware", "vbox", "virtualbox", "qemu", "xen", "hyperv"],
        "Anti-Sandbox": ["sandbox", "cuckoo", "joesandbox", "anyrun"],
        "Timing Checks": ["sleep", "gettickcount", "rdtsc", "timegettime"],
        "Process Checks": ["tasklist", "taskmgr", "procmon", "procexp"],
    }

    hits = []
    for cat, keys in patterns.items():
        for k in keys:
            if any(k in x for x in s):
                hits.append(f"{cat}: {k}")
                break

    return {"anti_analysis": hits}

detect_obfuscation

detect_obfuscation(path)

Obfuscation heuristics: many high-entropy regions, simple XOR-like patterns, and "noisy" strings.

Source code in src/tools/static_analysis.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
def detect_obfuscation(path: str) -> Dict[str, Any]:
    """
    Obfuscation heuristics: many high-entropy regions, simple XOR-like patterns,
    and "noisy" strings.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    indicators = []

    # Many high-entropy regions
    highentropy_chunks = 0
    chunk = 4096
    for i in range(0, len(data), chunk):
        if entropy(data[i:i+chunk]) >= 7.2:
            highentropy_chunks += 1
    if highentropy_chunks >= 8:
        indicators.append(f"Many high-entropy blocks: {highentropy_chunks}")

    # Simple XOR-like patterns (look for 0x30-0x3f sequences in streams)
    xor_like = len(re.findall(rb"[\x30-\x3f]{3,}", data[:1_000_000]))  # limite 1MB
    if xor_like > 50:
        indicators.append(f"Possible XOR/obfuscation byte streams: {xor_like}")

    # "Noisy" strings (many mixed symbols)
    strings_all = get_ascii_strings(data, min_len=8)[:5000]
    noisy = 0
    for s in strings_all:
        # many non-alphanumeric symbols
        sym = sum(1 for c in s if not c.isalnum() and c not in " .:/_-")
        if len(s) > 16 and sym / max(1, len(s)) > 0.35:
            noisy += 1
    if noisy > 50:
        indicators.append(f"Many noisy strings: {noisy}")

    return {"obfuscation": indicators}

detect_packers

detect_packers(path)

Packer heuristics via section names/strings and string keywords.

Source code in src/tools/static_analysis.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def detect_packers(path: str) -> Dict[str, Any]:
    """
    Packer heuristics via section names/strings and string keywords.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    candidates = set()

    # Strings
    s = [x.lower() for x in get_ascii_strings(data, min_len=4)][:5000]
    def any_in(strings, subs):
        for sub in subs:
            if any(sub in x for x in strings):
                return True
        return False

    # Classic signals
    known = {
        "UPX": ["upx", "upx0", "upx1", "upx!"],
        "Themida": ["themida", "themida!"],
        "VMProtect": ["vmprotect", "vmp"],
        "ASPack": ["aspack", "aspack!"],
        "PECompact": ["pecompact", "pec1", "pec2"],
        "Armadillo": ["armadillo", "armadillo!"],
        "Obsidium": ["obsidium", "obsidium!"],
        "Enigma": ["enigma", "enig"],
        "MoleBox": ["molebox", "molebox!"],
        "Petite": ["petite", "petite!"],
    }
    for name, sigs in known.items():
        if any_in(s, sigs):
            candidates.add(name)

    # Sections and entropy
    if sniff_header(data) == "PE":
        if pefile:
            try:
                pe = pefile.PE(path, fast_load=True)
                for sec in pe.sections:
                    n = sec.Name.rstrip(b"\x00").decode(errors="ignore").lower()
                    raw = sec.get_data() or b""
                    ent = entropy(raw)
                    if n.startswith(".upx"):
                        candidates.add("UPX")
                    if ent >= 7.2:
                        candidates.add("HighEntropy")
            except:
                pass

    return {"packers": sorted(candidates)}

detect_suspicious_characteristics

detect_suspicious_characteristics(path)

General heuristics: RWX sections, very few imports, unusual entry point, etc.

Source code in src/tools/static_analysis.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def detect_suspicious_characteristics(path: str) -> Dict[str, Any]:
    """
    General heuristics: RWX sections, very few imports, unusual entry point, etc.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    suspicious = []

    if sniff_header(data) != "PE":
        return {"note": "Non-PE or undetected", "suspicious": suspicious}

    if not pefile:
        return {"error": "'pefile' not available"}

    try:
        pe = pefile.PE(path, fast_load=True)
        # RWX
        for s in pe.sections:
            ch = int(getattr(s, "Characteristics", 0))
            if (ch & 0x20000000) and (ch & 0x80000000):  # EXEC & WRITE
                sec_name = s.Name.rstrip(b"\x00").decode(errors="ignore")
                suspicious.append("RWX section: " + sec_name)

        imp_cnt = 0
        if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
            for entry in getattr(pe, "DIRECTORY_ENTRY_IMPORT", []):
                imp_cnt += len(entry.imports or [])
        if imp_cnt <= 5:
            suspicious.append(f"Very few imports ({imp_cnt}) - possible packing")

        # Entry point far from start (conservative threshold)
        try:
            ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
            if ep and ep > 0x100000:  # conservative threshold
                suspicious.append(f"Unusual entry point RVA: 0x{ep:x}")
        except:
            pass

    except Exception as e:
        return {"error": f"suspicious characteristics error: {e}"}

    return {"suspicious": suspicious}

extract_advanced_indicators

extract_advanced_indicators(path)

Consolidate packers, suspicious characteristics, anti-analysis, and obfuscation.

Source code in src/tools/static_analysis.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def extract_advanced_indicators(path: str) -> Dict[str, Any]:
    """
    Consolidate packers, suspicious characteristics, anti-analysis, and obfuscation.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}
    pack = detect_packers(path)
    sus = detect_suspicious_characteristics(path)
    anti = detect_anti_analysis(path)
    obf = detect_obfuscation(path)
    return {
        "packer_indicators": pack.get("packers", []),
        "suspicious_characteristics": sus.get("suspicious", []) if isinstance(sus, dict) else [],
        "anti_analysis": anti.get("anti_analysis", []) if isinstance(anti, dict) else [],
        "obfuscation": obf.get("obfuscation", []) if isinstance(obf, dict) else [],
    }

extract_basic_pe_info

extract_basic_pe_info(path)

Hashes, size, type, compile timestamp, packer hint, and import count.

Source code in src/tools/static_analysis.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def extract_basic_pe_info(path: str) -> Dict[str, Any]:
    """
    Hashes, size, type, compile timestamp, packer hint, and import count.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    t = sniff_header(data)
    info = {
        "path": os.path.abspath(path),
        "type": t,
        "size_bytes": os.path.getsize(path),
        "md5": hashlib.md5(data).hexdigest(),
        "sha1": hashlib.sha1(data).hexdigest(),
        "sha256": hashlib.sha256(data).hexdigest(),
    }

    if t != "PE":
        info["note"] = "Non-PE or undetected"
        return info

    if not pefile:
        info["error"] = "'pefile' not available"
        return info

    try:
        pe = pefile.PE(path, fast_load=True)
        ts = getattr(pe.FILE_HEADER, "TimeDateStamp", None)
        info["compile_timestamp"] = int(ts) if ts else None

        # Simple packer heuristic
        sections = []
        for s in pe.sections:
            name = s.Name.rstrip(b"\x00").decode(errors="ignore")
            raw = s.get_data() or b""
            sections.append({"name": name, "entropy": entropy(raw)})
        info["packer_hint"] = any(
            (sec["name"].lower().startswith(".upx") or "pack" in sec["name"].lower() or sec["entropy"] >= 7.2)
            for sec in sections
        )

        count = 0
        if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
            for entry in getattr(pe, "DIRECTORY_ENTRY_IMPORT", []):
                count += len(entry.imports or [])
        info["import_count"] = count

    except Exception as e:
        info["error"] = f"pefile parse error: {e}"

    return info

extract_code_signatures

extract_code_signatures(path, max_sigs=3, window=32)

Simple heuristic: extract hex signatures around the EntryPoint (and other heuristics).

Source code in src/tools/static_analysis.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def extract_code_signatures(path: str, max_sigs: int = 3, window: int = 32) -> Dict[str, Any]:
    """
    Simple heuristic: extract hex signatures around the EntryPoint (and other heuristics).
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    if sniff_header(data) != "PE":
        return {"note": "Non-PE or undetected"}

    if not pefile:
        return {"error": "'pefile' not available"}

    sigs = []
    try:
        pe = pefile.PE(path, fast_load=True)
        entry_rva = pe.OPTIONAL_HEADER.AddressOfEntryPoint
        entry_off = _rva_to_file_offset(pe, entry_rva)
        if entry_off is not None:
            start = max(0, entry_off)
            end = min(len(data), start + max(16, window))
            sigs.append({
                "label": "EntryPoint",
                "file_offset": start,
                "hex": " ".join(f"{b:02x}" for b in data[start:end])
            })

        # Extra heuristic: first executable section
        for s in pe.sections:
            ch = int(getattr(s, "Characteristics", 0))
            if ch & 0x20000000:  # EXECUTE
                off = int(s.PointerToRawData or 0)
                size = int(s.SizeOfRawData or 0)
                if size > 0:
                    end = min(len(data), off + min(size, window))
                    sec_name = s.Name.rstrip(b"\x00").decode(errors="ignore")
                    sigs.append({
                        "label": "ExecSection:" + sec_name,
                        "file_offset": off,
                        "hex": " ".join(f"{b:02x}" for b in data[off:end])
                    })
                break

        return {"signatures": sigs[:max_sigs]}

    except Exception as e:
        return {"error": f"signature parse error: {e}"}

extract_imports_analysis

extract_imports_analysis(path)

Categorize imports by area (network, crypto, system, etc.).

Source code in src/tools/static_analysis.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def extract_imports_analysis(path: str) -> Dict[str, Any]:
    """ Categorize imports by area (network, crypto, system, etc.). """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    if sniff_header(data) != "PE":
        return {"note": "Non-PE or undetected"}

    if not pefile:
        return {"error": "'pefile' not available"}

    categories = {
        "network":   ["wininet", "winhttp", "ws2_32", "iphlpapi", "wsock32", "urlmon"],
        "crypto":    ["crypt32", "bcrypt", "advapi32", "ncrypt", "secur32", "wintrust"],
        "system":    ["kernel32", "ntdll", "user32", "gdi32", "shell32", "ole32", "oleaut32", "rpcrt4"],
        "registry":  ["advapi32", "shlwapi"],
        "file":      ["kernel32", "ntdll", "msvcrt"],
        "process":   ["kernel32", "psapi", "tlhelp32", "ntdll"],
        "wmi":       ["wbem", "wbemcli", "wbemprox", "wmi"],
        "com":       ["ole32", "oleaut32", "comctl32", "comdlg32"],
        "scheduling":["taskschd", "advapi32", "kernel32"],
        "memory":    ["kernel32", "ntdll", "msvcrt"],
        "other":     [],
    }

    categorized: Dict[str, list] = {k: [] for k in categories.keys()}

    try:
        pe = pefile.PE(path, fast_load=True)
        try:
            dirs = [pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']]
            delay_dir = pefile.DIRECTORY_ENTRY.get('IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT')
            if delay_dir is not None:
                dirs.append(delay_dir)
            pe.parse_data_directories(directories=dirs)
        except Exception:
            try:
                pe.parse_data_directories()
            except Exception as e:
                return {"error": f"data directories parse error: {e}"}

        def process_entries(entries):
            for entry in entries or []:
                dll_bytes = entry.dll or b""
                lib = dll_bytes.decode(errors="ignore").lower()
                for imp in (entry.imports or []):
                    if getattr(imp, "name", None):
                        name = imp.name.decode(errors="ignore")
                    else:
                        name = f"ord#{getattr(imp, 'ordinal', '?')}"
                    placed = False
                    for cat, prefixes in categories.items():
                        if any(lib.startswith(pfx) for pfx in prefixes):
                            categorized[cat].append(f"{lib}!{name}")
                            placed = True
                            break
                    if not placed:
                        categorized["other"].append(f"{lib}!{name}")

        if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
            process_entries(pe.DIRECTORY_ENTRY_IMPORT)
        if hasattr(pe, "DIRECTORY_ENTRY_DELAY_IMPORT"):
            process_entries(pe.DIRECTORY_ENTRY_DELAY_IMPORT)

        trimmed = {k: v[:50] for k, v in categorized.items() if v}

        if not trimmed:
            return {
                "imports": {},
                "note": "No imports found after parsing (packed sample? API hashing/dynamic resolution? delay-load ausente?)."
            }

        return {"imports": trimmed}

    except Exception as e:
        return {"error": f"imports parse error: {e}"}

extract_iocs_from_strings

extract_iocs_from_strings(path, min_length=4, max_strings=10000, max_iocs=10000)

Extract IOCs (URLs, domains, IPv4s, and cryptocurrency wallets) from ASCII strings in a file. Accepts optional limits for number of strings scanned and IOCs returned.

Source code in src/tools/static_analysis.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def extract_iocs_from_strings(path: str, min_length: int = 4, max_strings: int = 10000, max_iocs: int = 10000) -> Dict[str, Any]:
    """
    Extract IOCs (URLs, domains, IPv4s, and cryptocurrency wallets) from ASCII strings in a file.
    Accepts optional limits for number of strings scanned and IOCs returned.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    strings = get_ascii_strings(data, min_len=min_length)[: max(0, int(max_strings))]

    url_re = re.compile(r"\bhttps?://[^\s'\"<>]+", re.I)
    urls_set = set()
    for s in strings:
        for look in (s, defang_ioc(s)):
            for m in url_re.finditer(look):
                urls_set.add(m.group(0).rstrip(").,]"))
    urls = list(urls_set)[: max(0, int(max_iocs))]

    domain_re = re.compile(r"\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+(?:[a-z]{2,63})\b", re.I)
    domains_set = set()
    for s in strings:
        for look in (s, defang_ioc(s)):
            for m in domain_re.finditer(look):
                dom = m.group(0).lower().rstrip(").,]")
                domains_set.add(dom)
    domains_all = list(domains_set)[: max(0, int(max_iocs))]

    ipv4_re = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
    ipv4s_all = [m.group(0) for s in strings for m in ipv4_re.finditer(s)]
    ipv4s = [ip for ip in ipv4s_all if all(0 <= int(p) <= 255 for p in ip.split("."))][: max(0, int(max_iocs))]

    btc_re = re.compile(r"\b(?:[13][a-km-zA-HJ-NP-Z1-9]{25,34}|bc1[ac-hj-np-z02-9]{25,39})\b")
    eth_re = re.compile(r"\b0x[a-fA-F0-9]{40}\b")
    btc = list({m.group(0) for s in strings for m in btc_re.finditer(s)})[: max(0, int(max_iocs))]
    eth = list({m.group(0) for s in strings for m in eth_re.finditer(s)})[: max(0, int(max_iocs))]

    return {
        "path": os.path.abspath(path),
        "counts": {"urls": len(urls), "domains": len(domains_all), "ipv4s": len(ipv4s),
                "btc_addresses": len(btc), "eth_addresses": len(eth)},
        "urls": urls,
        "domains": domains_all,
        "ipv4s": ipv4s,
        "btc_addresses": btc,
        "eth_addresses": eth
    }

extract_sections_analysis

extract_sections_analysis(path)

Return name, sizes, entropy, and basic flags for each section.

Source code in src/tools/static_analysis.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def extract_sections_analysis(path: str) -> Dict[str, Any]:
    """
    Return name, sizes, entropy, and basic flags for each section.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    if sniff_header(data) != "PE":
        return {"note": "Non-PE or undetected"}

    if not pefile:
        return {"error": "'pefile' not available"}

    try:
        pe = pefile.PE(path, fast_load=True)
        out = []
        for s in pe.sections:
            name = s.Name.rstrip(b"\x00").decode(errors="ignore")
            raw = s.get_data() or b""
            ch = int(getattr(s, "Characteristics", 0))
            flags = []
            # IMAGE_SCN_MEM_* flags
            if ch & 0x20000000:  # EXECUTE
                flags.append("exec")
            if ch & 0x80000000:  # WRITE
                flags.append("write")
            if ch & 0x40000000:  # READ
                flags.append("read")
            out.append({
                "name": name,
                "virtual_size": int(getattr(s, "Misc_VirtualSize", 0)),
                "raw_size": int(s.SizeOfRawData),
                "entropy": entropy(raw),
                "characteristics": flags
            })
        return {"sections": out}
    except Exception as e:
        return {"error": f"sections parse error: {e}"}

extract_stable_strings

extract_stable_strings(path, min_length=4, max_items=50)

Extract ASCII strings and filter by relevance/stability.

Source code in src/tools/static_analysis.py
375
376
377
378
379
380
381
382
383
384
def extract_stable_strings(path: str, min_length: int = 4, max_items: int = 50) -> Dict[str, Any]:
    """
    Extract ASCII strings and filter by relevance/stability.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}
    data = read_file(path)
    strs = get_ascii_strings(data, min_len=min_length)
    stables = [s for s in strs if is_stable_string_impl(s)]
    return {"path": os.path.abspath(path), "strings": stables[:max_items], "total_candidates": len(stables)}

extract_triage_data

extract_triage_data(path, strings_min_len=4)

Run consolidated triage: basic info, imports, sections, version, stable strings, code signatures, advanced indicators, and local YARA/CAPA.

Source code in src/tools/static_analysis.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def extract_triage_data(path: str, strings_min_len: int = 4) -> Dict[str, Any]:
    """
    Run consolidated triage: basic info, imports, sections, version,
    stable strings, code signatures, advanced indicators, and local YARA/CAPA.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    basic = extract_basic_pe_info(path)
    imports = extract_imports_analysis(path)
    sections = extract_sections_analysis(path)
    version = extract_version_info(path)
    stable = extract_stable_strings(path, min_length=strings_min_len)
    signatures = extract_code_signatures(path)
    advanced = extract_advanced_indicators(path)
    sh_entropy = calculateentropy(path)
    try:
        yara = yara_scan.func(path) 
    except Exception as e:
        yara = {"error": str(e)}
    try:
        capa = capa_scan.func(path)
    except Exception as e:
        capa = {"error": str(e)}
    iocs = extract_iocs_from_strings(path, min_length=strings_min_len)

    return {
        "path": os.path.abspath(path),
        "basic_info": basic,
        "shannon_entropy": sh_entropy,
        "imports": imports,
        "sections": sections,
        "version_info": version,
        "stable_strings": stable.get("strings", []) if isinstance(stable, dict) else stable,
        "code_signatures": signatures.get("signatures", []) if isinstance(signatures, dict) else signatures,
        "advanced_indicators": advanced,
        "yara": yara,
        "capa": capa,
        "iocs": iocs
    }

extract_version_info

extract_version_info(path)

Extract VS_VERSION_INFO (StringFileInfo) when available.

Source code in src/tools/static_analysis.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def extract_version_info(path: str) -> Dict[str, Any]:
    """
    Extract VS_VERSION_INFO (StringFileInfo) when available.
    """
    if not file_exists(path):
        return {"error": f"file not found: {path}"}

    data = read_file(path)
    if sniff_header(data) != "PE":
        return {"note": "Non-PE or undetected"}

    if not pefile:
        return {"error": "'pefile' not available"}

    info = {
        "CompanyName": "Not found",
        "FileDescription": "Not found",
        "ProductName": "Not found",
        "OriginalFilename": "Not found",
        "LegalCopyright": "Not found",
        "FileVersion": "Not found",
        "ProductVersion": "Not found",
        "InternalName": "Not found",
    }
    try:
        pe = pefile.PE(path, fast_load=False)
        if hasattr(pe, "FileInfo") and pe.FileInfo:
            for fileinfo in pe.FileInfo:
                if fileinfo and hasattr(fileinfo, "StringTable"):
                    for st in fileinfo.StringTable or []:
                        for k, v in st.entries.items():
                            key = k.decode(errors="ignore")
                            val = v.decode(errors="ignore")
                            if key in info:
                                info[key] = val
        return info
    except Exception as e:
        return {"error": f"version parse error: {e}"}

is_stable_string

is_stable_string(s)

Return whether a string is a 'stable' and relevant candidate.

Source code in src/tools/static_analysis.py
368
369
370
371
372
373
def is_stable_string(s: str) -> Dict[str, Any]:
    """Return whether a string is a 'stable' and relevant candidate."""
    try:
        return {"string": s, "stable": bool(is_stable_string_impl(s))}
    except Exception as e:
        return {"error": str(e)}

src.tools.yara_tool

yara_scan

yara_scan(path)

Run YARA scan against a file and summarize matches.

Source code in src/tools/yara_tool.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@tool
@log_tool("yara_scan")
def yara_scan(path:str)->dict:
    """Run YARA scan against a file and summarize matches."""
    if not exists(path):
        return {"error": f"file not found: {path}"}
    rules_dir = YARA_RULES_DIR
    if not rules_dir:
        return {"error": "YARA_RULES_DIR not set and rules_dir not provided"}

    rule_files={}
    if os.path.isfile(rules_dir) and rules_dir.lower().endswith((".yar",".yara")):
        single_path = os.path.abspath(rules_dir)
        try:
            log.debug("Compiling YARA rules (single file): %s", single_path)
            rules = yara.compile(filepath=single_path)
        except Exception as e:
            log.exception("YARA compile error (single file): %s", e)
            return {"error": f"YARA compile error: {e}"}
    else:
        for root,_,files in os.walk(rules_dir):
            for fn in files:
                if fn.lower().endswith((".yar",".yara")):
                    key = os.path.relpath(os.path.join(root, fn), rules_dir)
                    rule_files[key] = os.path.join(root, fn)
        if not rule_files:
            return {"warning": f"No YARA rules found in {os.path.abspath(rules_dir)}"}
        try:
            log.debug("Compiling YARA rules from %s files", len(rule_files))
            rules = yara.compile(filepaths=rule_files)
        except Exception as e:
            log.exception("YARA compile error (directory): %s", e)
            return {"error": f"YARA compile error: {e}"}
    try:
        basename = os.path.basename(path)
        ext = os.path.splitext(basename)[1].lstrip(".").lower()
        filesize = os.path.getsize(path)
        with open(path, "rb") as f:
            data = f.read()
        externals = {
            "filename": basename,
            "filepath": os.path.abspath(path),
            "extension": ext,
            "filesize": filesize,
            "sha256": hashlib.sha256(data).hexdigest(),
            "md5": hashlib.md5(data).hexdigest(),
        }
    except Exception:
        externals = {}

    try:
        matches = rules.match(filepath=path, timeout=DEFAULT_TIMEOUT, externals=externals)
    except Exception as e:
        log.exception("YARA match error: %s", e)
        return {"error": f"YARA match error: {e}"}
    res = []
    fam = []

    for m in matches:
        meta = dict(getattr(m, "meta", {}) or {})
        description = meta.get("description", "")
        res.append({
            "rule": m.rule,
            "description": description
        })

    result = {
        "match_count": len(res), 
        "matches": res
    }

    log.info("YARA matches: %s", result["match_count"])
    return result

src.tools.capa_tool

build_result_document

build_result_document(rules_path, input_file, signature_paths=None)

Performs extraction and matching, packages metadata/layout. Returns the ResultDocument and the structures needed for different renderers.

Source code in src/tools/capa_tool.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def build_result_document(
    rules_path: Path,
    input_file: Path,
    signature_paths: Optional[List[Path]] = None,
) -> tuple[rd.ResultDocument, capa.rules.RuleSet, capa.capabilities.common.CapabilitiesResult, Any]:
    """
    Performs extraction and matching, packages metadata/layout.
    Returns the ResultDocument and the structures needed for different renderers.
    """
    silence_vivisect_logging()
    rules = capa.rules.get_rules([rules_path])
    signature_paths = signature_paths or []
    extractor = capa.loader.get_extractor(
        input_file,
        FORMAT_AUTO,
        OS_AUTO,
        capa.main.BACKEND_VIV,
        signature_paths,
        should_save_workspace=False,
        disable_progress=True,
    )

    capabilities = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
    meta = capa.loader.collect_metadata([], input_file, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, capabilities)
    meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)
    doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches)
    return doc, rules, capabilities, meta

capa_scan

capa_scan(path, output_format='summary')

Executes a CAPA Scan and returns a summarized JSON

Source code in src/tools/capa_tool.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@tool
@log_tool("capa_scan")
def capa_scan(
        path: str,
        output_format: str = "summary",
        ) -> Any:
    """Executes a CAPA Scan and returns a summarized JSON"""
    log.info("CAPA: scanning path=%s format=%s", path, output_format)
    input_file = Path(path)
    if not input_file.exists():
        raise FileNotFoundError(f"File not found: {input_file}")

    rules_path = get_rules_path()
    sigs_paths = get_signatures_path()

    doc, rules, capabilities, meta = build_result_document(rules_path, input_file, sigs_paths)

    if output_format == "json":
        result = json.loads(capa.render.json.render(meta, rules, capabilities.matches))
        log.info("CAPA: completed (full json) capabilities=%d", sum(len(v) for v in result.get("CAPABILITY", {}).values()) if isinstance(result, dict) else -1)
        return result

    d = render_dictionary(doc)
    if output_format in ("dictionary", "dict"):
        log.info("CAPA: completed (dict) capabilities=%d", sum(len(v) for v in d.get("CAPABILITY", {}).values()))
        return d

    cap = {k: v[:12] for k, v in (d.get("CAPABILITY") or {}).items()}
    att = {k: v[:10] for k, v in (d.get("ATTCK") or {}).items()}
    mbc = {k: v[:10] for k, v in (d.get("MBC") or {}).items()}
    out = {
        "sha256": d.get("sha256"),
        "CAPABILITY": cap,
        "ATTCK": att,
        "MBC": mbc,
    }
    log.info("CAPA: completed (summary) caps_namespaces=%d", len(cap))
    return out

find_subrule_matches

find_subrule_matches(doc)

Colects Submatches rule names

Source code in src/tools/capa_tool.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def find_subrule_matches(doc: rd.ResultDocument) -> Set[str]:
    """Colects Submatches rule names"""
    matches: Set[str] = set()

    def rec(node: rd.Match) -> None:
        if not node.success:
            return
        if isinstance(node.node, rd.StatementNode):
            for child in node.children:
                rec(child)
        elif isinstance(node.node, rd.FeatureNode):
            if isinstance(node.node.feature, frzf.MatchFeature):
                matches.add(node.node.feature.match)

    for rule in rutils.capability_rules(doc):
        for _, node in rule.matches:
            rec(node)

    return matches

render_attack

render_attack(doc, result)

Generates ATT&CK structure grouped by tactic

Source code in src/tools/capa_tool.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def render_attack(doc: rd.ResultDocument, result: dict) -> None:
    """Generates ATT&CK structure grouped by tactic"""
    result["ATTCK"] = {}
    tactics = collections.defaultdict(set)
    for rule in rutils.capability_rules(doc):
        if not rule.meta.attack:
            continue
        for attack in rule.meta.attack:
            tactics[attack.tactic].add((attack.technique, attack.subtechnique, attack.id))

    for tactic, techniques in sorted(tactics.items()):
        rows = []
        for technique, subtechnique, tid in sorted(techniques):
            rows.append(f"{technique} {tid}" if subtechnique is None else f"{technique}::{subtechnique} {tid}")
        result["ATTCK"].setdefault(tactic.upper(), rows)

render_capabilities

render_capabilities(doc, result)

Builds a CAPABILITY dictionary with keys = namespaces and values = a list of capabilities.

Source code in src/tools/capa_tool.py
62
63
64
65
66
67
68
69
70
71
72
def render_capabilities(doc: rd.ResultDocument, result: dict) -> None:
    """Builds a CAPABILITY dictionary with keys = namespaces and values = a list of capabilities."""
    subrule_matches = find_subrule_matches(doc)
    result["CAPABILITY"] = {}
    for rule in rutils.capability_rules(doc):
        if rule.meta.name in subrule_matches:
            continue
        count = len(rule.matches)
        capability = rule.meta.name if count == 1 else f"{rule.meta.name} ({count} matches)"
        result["CAPABILITY"].setdefault(rule.meta.namespace, [])
        result["CAPABILITY"][rule.meta.namespace].append(capability)

render_dictionary

render_dictionary(doc)

Consolidates the final dictionary with meta, ATT&CK, MBC and Capabilities.

Source code in src/tools/capa_tool.py
106
107
108
109
110
111
112
113
def render_dictionary(doc: rd.ResultDocument) -> dict:
    """Consolidates the final dictionary with meta, ATT&CK, MBC and Capabilities."""
    result: dict[str, Any] = {}
    render_meta(doc, result)
    render_attack(doc, result)
    render_mbc(doc, result)
    render_capabilities(doc, result)
    return result

render_mbc

render_mbc(doc, result)

Generates MBC Structure

Source code in src/tools/capa_tool.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def render_mbc(doc: rd.ResultDocument, result: dict) -> None:
    """Generates MBC Structure"""
    result["MBC"] = {}
    objectives = collections.defaultdict(set)
    for rule in rutils.capability_rules(doc):
        if not rule.meta.mbc:
            continue
        for mbc in rule.meta.mbc:
            objectives[mbc.objective].add((mbc.behavior, mbc.method, mbc.id))

    for objective, behaviors in sorted(objectives.items()):
        rows = []
        for behavior, method, mid in sorted(behaviors):
            rows.append(f"{behavior} [{mid}]" if method is None else f"{behavior}::{method} [{mid}]")
        result["MBC"].setdefault(objective.upper(), rows)

silence_vivisect_logging

silence_vivisect_logging()

Reduce noise from vivisect/viv-utils/envi by lowering their logger levels and preventing propagation.

Source code in src/tools/capa_tool.py
23
24
25
26
27
28
29
30
31
32
33
34
def silence_vivisect_logging() -> None:
    """Reduce noise from vivisect/viv-utils/envi by lowering their logger levels and preventing propagation."""
    for name in ("vivisect", "viv_utils", "viv", "capa.loader.viv", "envi", "envi.codeflow"):
        try:
            lg = logging.getLogger(name)
            lg.setLevel(logging.ERROR)
            lg.propagate = False
            has_null = any(isinstance(h, logging.NullHandler) for h in lg.handlers)
            if not has_null:
                lg.addHandler(logging.NullHandler())
        except Exception:
            pass

src.tools.cti_analysis

hybrid_analysis_lookup

hybrid_analysis_lookup(sha256)

Hybrid Analysis Sandbox Lookup (SHA256)

Source code in src/tools/cti_analysis.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def hybrid_analysis_lookup(sha256: str) -> Dict[str, Any]:
    """Hybrid Analysis Sandbox Lookup (SHA256)"""
    if not HA_API_KEY:
        return {"error": "HA_API_KEY not set"}
    if not sha256:
        return {"error": "empty sha256"}

    headers = {
        "api-key": HA_API_KEY,
        "User-Agent": "Falcon Sandbox",
        "accept": "application/json",
    }

    url = f"https://hybrid-analysis.com/api/v2/overview/{sha256}"
    st, txt, js = http_get(url, headers=headers, timeout=DEFAULT_TIMEOUT)
    if st != 200:
        return {"error": f"HA HTTP {st}", "text": txt[:400]}
    return js

malwarebazaar_lookup

malwarebazaar_lookup(hash_value)

MalwareBazaar Hash Lookup (md5/sha1/sha256).

Source code in src/tools/cti_analysis.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def malwarebazaar_lookup(hash_value: str) -> Dict[str, Any]:
    """MalwareBazaar Hash Lookup (md5/sha1/sha256)."""
    if not ABUSE_KEY:
        return {"error": "ABUSE_API_KEY not set"}
    if not hash_value:
        return {"error": "empty hash"}
    st, txt, js = http_post(
        "https://mb-api.abuse.ch/api/v1/",
        data={"query": "get_info", "hash": hash_value},
        headers={"Auth-Key": ABUSE_KEY},
        timeout=DEFAULT_TIMEOUT,
    )
    if st != 200:
        return {"error": f"MB HTTP {st}", "text": (txt or "")[:400]}
    return js

normalize_hash

normalize_hash(vt, mb, ha, otx, sha256)

Simple JSON Normalization

Source code in src/tools/cti_analysis.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def normalize_hash(vt: Dict[str, Any] | None,
                   mb: Dict[str, Any] | None,
                   ha: Dict[str, Any] | None,
                   otx: Dict[str, Any] | None,
                   sha256: str) -> Dict[str, Any]:
    """Simple JSON Normalization"""
    labels: List[str] = []
    refs: List[str] = []
    known_mal = None

    # VirusTotal tags/basics
    try:
        attrs = (vt or {}).get("data", {}).get("attributes", {})
        tags = attrs.get("tags") or []
        labels.extend([str(t) for t in tags][:20])
        if "last_analysis_stats" in attrs:
            mal = int(attrs["last_analysis_stats"].get("malicious", 0))
            known_mal = (mal > 0)
        refs.append(f"https://www.virustotal.com/gui/file/{sha256}")
    except Exception:
        pass

    # Small fields from MalwareBazaar
    try:
        if (mb or {}).get("query_status") == "ok":
            data = (mb or {}).get("data") or []
            if data:
                sig = data[0].get("signature")
                if sig: labels.append(sig)
                dl = data[0].get("download_url")
                if dl: refs.append(dl)
    except Exception:
        pass

    # OTX Pulses and Tags
    try:
        pulses = (otx or {}).get("pulse_info", {}).get("pulses", []) or []
        for p in pulses[:5]:
            name = p.get("name")
            if name: labels.append(name)
        sci = (otx or {}).get("indicator", {}).get("description")
        if sci: labels.append(str(sci))
        refs.append(f"https://otx.alienvault.com/indicator/file/{sha256}")
    except Exception:
        pass

    # Hybrid Analysis Family extractor
    try:
        if isinstance(ha, list) and ha:
            fam = ha[0].get("vx_family") or ha[0].get("verdict") or ha[0].get("threat_score")
            if fam:
                labels.append(str(fam))
        elif isinstance(ha, dict):
            fam = ha.get("vx_family") or ha.get("verdict") or ha.get("threat_score")
            if fam:
                labels.append(str(fam))
    except Exception:
        pass

    # Dedup
    labels = sorted({x for x in labels if x})
    refs   = sorted({x for x in refs if x})

    return {
        "hash": sha256,
        "providers": {
            "virustotal": vt,
            "malwarebazaar": mb,
            "hybridanalysis": ha,
            "otx": otx,
        },
        "summary": {
            "known_malicious": known_mal,
            "threat_labels": labels[:50],
            "references": refs[:50],
        },
    }

otx_query_ioc

otx_query_ioc(ioc)

AlienVault OTX Lookup (Hash, Domain and IP)

Source code in src/tools/cti_analysis.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def otx_query_ioc(ioc: str) -> Dict[str, Any]:
    """AlienVault OTX Lookup (Hash, Domain and IP)"""
    if not OTX_API_KEY:
        return {"error": "OTX_API_KEY not set"}
    ioc_type = detect_ioc_type(ioc)
    base = "https://otx.alienvault.com/api/v1/indicators"
    if ioc_type in ("sha256", "md5"):
        path = f"/file/{ioc}/general"
    elif ioc_type == "ip":
        path = f"/IPv4/{ioc}/general"
    elif ioc_type == "domain":
        path = f"/domain/{ioc}/general"
    else:
        return {"error": f"unsupported ioc type: {ioc_type}"}

    st, txt, js = http_get(
        base + path,
        headers={"X-OTX-API-KEY": OTX_API_KEY},
        timeout=DEFAULT_TIMEOUT
    )
    if st != 200:
        return {"error": f"OTX HTTP {st}", "text": txt[:400], "type": ioc_type}
    return js

vt_lookup

vt_lookup(sha256)

VirusTotal file lookup.

Source code in src/tools/cti_analysis.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def vt_lookup(sha256: str) -> Dict[str, Any]:
    """VirusTotal file lookup."""
    if not VT_API_KEY:
        return {"error": "VT_API_KEY not set"}
    if not sha256:
        return {"error": "empty sha256"}
    status, txt, js = http_get(
        f"https://www.virustotal.com/api/v3/files/{sha256}",
        headers={"x-apikey": VT_API_KEY},
        timeout=DEFAULT_TIMEOUT
    )
    if status != 200:
        return {"error": f"VT HTTP {status}", "text": txt[:400]}
    return js