Skip to content

Tests

tests.conftest

set_env

set_env(monkeypatch)

Alias of pytest fixture for readability in tests.

Source code in tests/conftest.py
12
13
14
15
@pytest.fixture
def set_env(monkeypatch):
    """Alias of pytest fixture for readability in tests."""
    return monkeypatch

tests.test_api_storage

test_save_and_get_analysis_by_sha256

test_save_and_get_analysis_by_sha256(tmp_path, monkeypatch)

Persists an analysis in a temp SQLite DB and fetches it by sha256.

Source code in tests/test_api_storage.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def test_save_and_get_analysis_by_sha256(tmp_path, monkeypatch):
    """Persists an analysis in a temp SQLite DB and fetches it by sha256."""
    db_path = tmp_path / "analyses_test.db"
    monkeypatch.setenv("DB_PATH", str(db_path))

    hashes = {
        "md5": "d41d8cd98f00b204e9800998ecf8427e",
        "sha1": "da39a3ee5e6b4b0d3255bfef95601890afd80709",
        "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
    }
    result = {"ok": True, "score": 42}

    rec_id = storage.save_analysis("file.bin", 0, hashes, result, hint="h", model="m")
    assert isinstance(rec_id, str) and len(rec_id) > 0

    fetched = storage.get_analysis_by_sha256(hashes["sha256"])
    assert fetched == result

tests.test_cti_analysis

test_malwarebazaar_lookup_no_key

test_malwarebazaar_lookup_no_key(set_env)

Forces empty MalwareBazaar key and expects an explicit error response.

Source code in tests/test_cti_analysis.py
10
11
12
13
14
def test_malwarebazaar_lookup_no_key(set_env):
    """Forces empty MalwareBazaar key and expects an explicit error response."""
    set_env.setattr(cti, "ABUSE_KEY", "")
    out = cti.malwarebazaar_lookup("d41d8cd98f00b204e9800998ecf8427e")
    assert out.get("error") == "ABUSE_API_KEY not set"

test_normalize_hash_merges_labels_and_refs

test_normalize_hash_merges_labels_and_refs()

Ensures normalize_hash merges labels and references from providers.

Source code in tests/test_cti_analysis.py
22
23
24
25
26
27
28
29
30
31
32
33
def test_normalize_hash_merges_labels_and_refs():
    """Ensures normalize_hash merges labels and references from providers."""
    vt = {"data": {"attributes": {"tags": ["trojan", "loader"], "last_analysis_stats": {"malicious": 1}}}}
    mb = {"query_status": "ok", "data": [{"signature": "FakeSig", "download_url": "http://mb.example"}]}
    ha = [{"vx_family": "FamX"}]
    otx = {"pulse_info": {"pulses": [{"name": "Pulse1"}]}, "indicator": {"description": "desc"}}
    out = cti.normalize_hash(vt, mb, ha, otx, "00" * 32)
    labels = out.get("summary", {}).get("threat_labels", [])
    refs = out.get("summary", {}).get("references", [])
    assert any("trojan" in x for x in labels)
    assert any("Pulse1" in x for x in labels)
    assert any("http://mb.example" in x for x in refs)

test_otx_query_unsupported_type_without_key

test_otx_query_unsupported_type_without_key(set_env)

Uses a dummy OTX key to reach type validation and assert the error.

Source code in tests/test_cti_analysis.py
16
17
18
19
20
def test_otx_query_unsupported_type_without_key(set_env):
    """Uses a dummy OTX key to reach type validation and assert the error."""
    set_env.setattr(cti, "OTX_API_KEY", "dummy")
    out = cti.otx_query_ioc("not-an-ioc")
    assert "unsupported ioc type" in out.get("error", "")

test_vt_lookup_no_key

test_vt_lookup_no_key(set_env)

Forces empty VT key and expects an explicit error response.

Source code in tests/test_cti_analysis.py
4
5
6
7
8
def test_vt_lookup_no_key(set_env):
    """Forces empty VT key and expects an explicit error response."""
    set_env.setattr(cti, "VT_API_KEY", "")
    out = cti.vt_lookup("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
    assert out.get("error") == "VT_API_KEY not set"

tests.test_logging_config

test_configure_logging_allows_debug

test_configure_logging_allows_debug()

Configures logging to DEBUG and asserts the logger honors it.

Source code in tests/test_logging_config.py
5
6
7
8
9
def test_configure_logging_allows_debug():
    """Configures logging to DEBUG and asserts the logger honors it."""
    configure_logging("DEBUG")
    log = get_logger("tools.test")
    assert log.isEnabledFor(logging.DEBUG)

test_log_tool_decorator_wraps_and_logs

test_log_tool_decorator_wraps_and_logs(caplog)

Verifies log_tool decorates a function and emits start logs.

Source code in tests/test_logging_config.py
11
12
13
14
15
16
17
18
19
20
21
22
23
def test_log_tool_decorator_wraps_and_logs(caplog):
    """Verifies log_tool decorates a function and emits start logs."""
    caplog.set_level(logging.DEBUG)

    @log_tool("demo")
    def add(a, b):
        return a + b

    with caplog.at_level(logging.DEBUG, logger="tools.demo"):
        out = add(2, 3)

    assert out == 5
    assert any("start args=" in rec.message for rec in caplog.records if rec.name == "tools.demo")

tests.test_static_analysis

test_comprehensive_triage_non_pe_with_iocs

test_comprehensive_triage_non_pe_with_iocs(tmp_path)

Run comprehensive triage on a small non-PE file and assert key sections.

Source code in tests/test_static_analysis.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def test_comprehensive_triage_non_pe_with_iocs(tmp_path):
    """Run comprehensive triage on a small non-PE file and assert key sections."""
    data = (
        b"Hello http://example.com [.]defanged.com IP 8.8.8.8 "
        b"BTC bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080 ETH 0xabcDEF1234567890abcDEF1234567890abcDEF12"
    )
    p = write_bytes(tmp_path, "triage.bin", data)

    out = start_triage.invoke({"path": p.as_posix(), "strings_min_len": 3})
    assert isinstance(out, dict)

    # basic_info sanity
    basic = out.get("basic_info", {})
    assert basic.get("size_bytes") == len(data)
    assert basic.get("md5") == hashlib.md5(data).hexdigest()
    assert basic.get("type") in ("Unknown", "PE", "ELF")

    # entropy wrapper
    ent = out.get("shannon_entropy", {})
    assert isinstance(ent.get("entropy"), float)
    assert ent.get("sampled_bytes") == len(data)

    # iocs detected from strings
    iocs = out.get("iocs", {})
    counts = iocs.get("counts", {})
    assert counts.get("urls", 0) >= 1
    assert counts.get("domains", 0) >= 1
    assert counts.get("ipv4s", 0) >= 1
    assert counts.get("btc_addresses", 0) >= 1
    assert counts.get("eth_addresses", 0) >= 1

    # stable_strings should be a list
    assert isinstance(out.get("stable_strings", []), list)

    # advanced_indicators present with expected keys
    adv = out.get("advanced_indicators", {})
    for k in ("packer_indicators", "suspicious_characteristics", "anti_analysis", "obfuscation"):
        assert k in adv

write_bytes

write_bytes(tmp_path, name, data)

Helper: write bytes to a temp file and return its Path.

Source code in tests/test_static_analysis.py
5
6
7
8
9
def write_bytes(tmp_path, name: str, data: bytes) -> Path:
    """Helper: write bytes to a temp file and return its Path."""
    p = tmp_path / name
    p.write_bytes(data)
    return p