Skip to content

API App

src.api.app

delete_by_id

delete_by_id(rec_id)

Delete a stored analysis by id.

Source code in src/api/app.py
142
143
144
145
146
147
148
149
@app.delete("/analyses/{rec_id}")
def delete_by_id(rec_id: str):
    """Delete a stored analysis by id."""
    log.info("/analyses delete id=%s", rec_id)
    n = storage_delete_analysis_by_id(rec_id)
    if n <= 0:
        raise HTTPException(status_code=404, detail="analysis not found")
    return {"deleted": True, "count": n, "id": rec_id}

fetch_by_id

fetch_by_id(rec_id)

Return stored analysis JSON for a specific record id.

Source code in src/api/app.py
133
134
135
136
137
138
139
140
@app.get("/analyses/{rec_id}")
def fetch_by_id(rec_id: str):
    """Return stored analysis JSON for a specific record id."""
    log.info("/analyses fetch id=%s", rec_id)
    out = storage_get_analysis_by_id(rec_id)
    if out is None:
        raise HTTPException(status_code=404, detail="analysis not found")
    return out

fetch_by_sha256

fetch_by_sha256(hash)

Return stored analysis JSON for the most recent record with given sha256.

Source code in src/api/app.py
124
125
126
127
128
129
130
131
@app.get("/analyses/sha256/{hash}")
def fetch_by_sha256(hash: str):
    """Return stored analysis JSON for the most recent record with given sha256."""
    log.info("/analyses/sha256 fetch hash=%s", hash)
    out = get_analysis_by_sha256(hash)
    if out is None:
        raise HTTPException(status_code=404, detail="analysis not found")
    return out

healthz

healthz()

Lightweight healthcheck endpoint.

Source code in src/api/app.py
40
41
42
43
@app.get("/healthz")
def healthz():
    """Lightweight healthcheck endpoint."""
    return {"status": "ok"}

list_analyses

list_analyses(page=Query(1, ge=1), page_size=Query(20, ge=1, le=200), sha256=Query(None), sha1=Query(None), md5=Query(None), date_from=Query(None, description='ISO8601 inclusive lower bound'), date_to=Query(None, description='ISO8601 inclusive upper bound'))

List analyses with pagination and optional filters.

Items exclude heavy result payloads; use fetch endpoints to retrieve full result JSON.

Source code in src/api/app.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@app.get("/analyses")
def list_analyses(
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=200),
    sha256: str | None = Query(None),
    sha1: str | None = Query(None),
    md5: str | None = Query(None),
    date_from: str | None = Query(None, description="ISO8601 inclusive lower bound"),
    date_to: str | None = Query(None, description="ISO8601 inclusive upper bound"),
):
    """List analyses with pagination and optional filters.

    Items exclude heavy result payloads; use fetch endpoints to retrieve full result JSON.
    """
    log.info("/analyses list page=%s size=%s filters sha256=%s sha1=%s md5=%s from=%s to=%s",
            page, page_size, bool(sha256), bool(sha1), bool(md5), date_from, date_to)
    return storage_list_analyses(
        page=page,
        page_size=page_size,
        sha256=sha256,
        sha1=sha1,
        md5=md5,
        date_from=date_from,
        date_to=date_to,
    )

purge_by_sha256

purge_by_sha256(sha256=Query(..., description='sha256 to purge'))

Delete all records for a given sha256.

Source code in src/api/app.py
151
152
153
154
155
156
@app.post("/analyses/purge")
def purge_by_sha256(sha256: str = Query(..., description="sha256 to purge")):
    """Delete all records for a given sha256."""
    log.info("/analyses/purge sha256=%s", sha256)
    n = storage_purge_analyses_by_sha256(sha256)
    return {"purged": n, "sha256": sha256}

ti_lookup

ti_lookup(req)

Lookup a hash across TI providers and return a normalized summary.

Supports sha256 (VT, HA, MB, OTX) and md5 (MB, OTX).

Source code in src/api/app.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@app.post("/ti/hash")
def ti_lookup(req: HashLookup):
    """Lookup a hash across TI providers and return a normalized summary.

    Supports sha256 (VT, HA, MB, OTX) and md5 (MB, OTX).
    """
    h = (req.hash or "").strip()
    if not h:
        raise HTTPException(status_code=400, detail="hash is required")
    t = detect_ioc_type(h)
    if t not in ("sha256", "md5"):
        raise HTTPException(status_code=400, detail=f"unsupported hash type: {t}")

    vt = vt_lookup(h) if t == "sha256" else None
    ha = hybrid_analysis_lookup(h) if t == "sha256" else None
    mb = malwarebazaar_lookup(h)
    otx = otx_query_ioc(h)

    return normalize_hash(vt, mb, ha, otx, h)