Skip to content

Agent Graph

src.agent.graph

build_graph

build_graph()

Builds the Langchain Graph

Source code in src/agent/graph.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def build_graph():
    """Builds the Langchain Graph"""
    g = StateGraph(State)

    # Nodes
    g.add_node("init_file_path", RunnableLambda(init_file_path_node))
    g.add_node("static_agent", RunnableLambda(static_agent_node))
    g.add_node("cti_analysis", RunnableLambda(cti_analysis_node))
    g.add_node("supervisor", RunnableLambda(supervisor_node))

    # Entry
    g.set_entry_point("init_file_path")

    # Edges
    g.add_edge("init_file_path", "static_agent")
    g.add_edge("init_file_path", "cti_analysis")
    g.add_edge("static_agent", "supervisor")
    g.add_edge("cti_analysis", "supervisor")
    g.add_edge("supervisor", END)

    return g.compile()

run_graph

run_graph(file_path, hint=None, model='gemini-2.0-flash')

Invokes the Langchain Graph

Source code in src/agent/graph.py
38
39
40
41
42
43
def run_graph(file_path: str, hint: Optional[str] = None, model: str = "gemini-2.0-flash") -> dict:
    """Invokes the Langchain Graph"""
    app = build_graph()
    init: State = {"file_path": file_path, "hint": hint or "", "model": model}
    log.info("run_graph init: file_path=%s model=%s", init["file_path"], model)
    return app.invoke(init).get("final", {})

src.agent.cti_agent

Threat Intel agent: wraps provider tools and normalization.

cti_from_hash

cti_from_hash(sha256)

Query VT, MalwareBazaar, Hybrid-Analysis, and OTX for a hash.

Source code in src/agent/cti_agent.py
40
41
42
43
44
45
46
47
48
def cti_from_hash(sha256: str) -> Dict:
    """Query VT, MalwareBazaar, Hybrid-Analysis, and OTX for a hash."""
    hash = sha256 or ""
    return {
        "ti_vt": vt_lookup_tool.invoke(hash),
        "ti_mb": malwarebazaar_lookup_tool.invoke(hash),
        "ti_ha": hybrid_analysis_lookup_tool.invoke(hash),
        "ti_otx": otx_query_ioc_tool.invoke(hash),
    }

hybrid_analysis_lookup_tool

hybrid_analysis_lookup_tool(sha256)

Hybrid Analysis hash search (full JSON).

Source code in src/agent/cti_agent.py
34
35
36
37
38
@tool
@log_tool("hybrid_analysis_lookup_tool")
def hybrid_analysis_lookup_tool(sha256: str) -> dict:
    """Hybrid Analysis hash search (full JSON)."""
    return hybrid_analysis_lookup(sha256)

malwarebazaar_lookup_tool

malwarebazaar_lookup_tool(hash_value)

Consulta MalwareBazaar (JSON completo).

Source code in src/agent/cti_agent.py
22
23
24
25
26
@tool
@log_tool("malwarebazaar_lookup_tool")
def malwarebazaar_lookup_tool(hash_value: str) -> dict:
    """Consulta MalwareBazaar (JSON completo)."""
    return malwarebazaar_lookup(hash_value)

normalize_cti

normalize_cti(vt, mb, ha, otx, sha256)

Normalize providers into a single structure for a hash indicator.

Source code in src/agent/cti_agent.py
50
51
52
def normalize_cti(vt: dict, mb: dict, ha: dict, otx: dict, sha256: str) -> Dict:
    """Normalize providers into a single structure for a hash indicator."""
    return normalize_hash(vt, mb, ha, otx, sha256)

otx_query_ioc_tool

otx_query_ioc_tool(ioc)

AlienVault OTX query (auto-rota por tipo de IOC).

Source code in src/agent/cti_agent.py
28
29
30
31
32
@tool
@log_tool("otx_query_ioc_tool")
def otx_query_ioc_tool(ioc: str) -> dict:
    """AlienVault OTX query (auto-rota por tipo de IOC)."""
    return otx_query_ioc(ioc)

vt_lookup_tool

vt_lookup_tool(sha256)

VirusTotal file lookup (full JSON).

Source code in src/agent/cti_agent.py
16
17
18
19
20
@tool
@log_tool("vt_lookup_tool")
def vt_lookup_tool(sha256: str) -> dict:
    """VirusTotal file lookup (full JSON)."""
    return vt_lookup(sha256)

src.agent.static_agent

Static triage agent: wraps pure static-analysis functions as LangChain tools.

This agent does not use an LLM. It collects evidence (hashes, PE, imports, sections, version, strings, indicators, YARA, CAPA) and returns a dictionary that the supervisor (LLM) can summarize later.

start_triage

start_triage(path, strings_min_len=4)

Run full triage: basic info, imports, sections, version, strings, signatures, indicators, YARA, CAPA.

Source code in src/agent/static_agent.py
18
19
20
21
22
23
24
25
@tool
@log_tool("extract_comprehensive_triage_data")
def start_triage(path: str, strings_min_len: int = 4) -> Dict[str, Any]:
    """Run full triage: basic info, imports, sections, version, strings, signatures, indicators, YARA, CAPA."""
    try:
        return extract_triage_data(path, strings_min_len)
    except Exception as e:
        return {"error": str(e)}