CYBERDUDEBIVASH

CYBERSECURITY & AI TECH BLOG
WWW.CYBERDUDEBIVASH.COM

MalwareBazaar API Quick-Start Script (CYBERDUDEBIVASH EDITION)

CYBERDUDEBIVASH


 Daily Threat Intel by CyberDudeBivash
Zero-days, exploit breakdowns, IOCs, detection rules & mitigation playbooks.
WWW.CYBERDUDEBIVASH.COM CYBERDUDEBIVASH PVT LTD


#!/usr/bin/env python3
"""
MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)
Defensive usage: query metadata for triage, threat intel, IOC enrichment.

Features:
- Query by hash (sha256/md5/sha1)
- Get recent samples
- Search by tag or signature
- Save results to JSON and optional CSV
- Optional download (OFF by default) for controlled lab-only use

Docs: https://bazaar.abuse.ch/api/
"""

from __future__ import annotations

import argparse
import csv
import json
import os
import time
from typing import Any, Dict, List, Optional

import requests

API_URL = "https://mb-api.abuse.ch/api/v1/"


def post_api(payload: Dict[str, Any], timeout: int = 20) -> Dict[str, Any]:
    r = requests.post(API_URL, data=payload, timeout=timeout)
    r.raise_for_status()
    return r.json()


def write_json(path: str, obj: Any) -> None:
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2, ensure_ascii=False)


def write_csv(path: str, rows: List[Dict[str, Any]], field_order: Optional[List[str]] = None) -> None:
    if not rows:
        return
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)

    # pick stable fields
    if field_order is None:
        # common MalwareBazaar keys
        field_order = [
            "sha256_hash", "md5_hash", "sha1_hash", "file_name", "file_type",
            "file_type_mime", "file_size", "first_seen", "last_seen", "reporter",
            "signature", "tags", "intelligence"
        ]
        # add any unknown fields
        for k in rows[0].keys():
            if k not in field_order:
                field_order.append(k)

    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=field_order)
        w.writeheader()
        for row in rows:
            clean = dict(row)
            # flatten lists/dicts for csv
            for k, v in list(clean.items()):
                if isinstance(v, (list, dict)):
                    clean[k] = json.dumps(v, ensure_ascii=False)
            w.writerow({k: clean.get(k, "") for k in field_order})


def normalize_rows(resp: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    MalwareBazaar usually returns:
      {"query_status":"ok","data":[{...},{...}]}
    or query_status not ok.
    """
    if resp.get("query_status") != "ok":
        return []
    data = resp.get("data")
    if isinstance(data, list):
        return data
    if isinstance(data, dict):
        return [data]
    return []


def download_sample(sha256: str, out_dir: str, timeout: int = 60) -> str:
    """
    Lab-only: downloads the sample zip from MalwareBazaar.
    Requires: query=get_file, sha256_hash=...
    """
    os.makedirs(out_dir, exist_ok=True)
    payload = {"query": "get_file", "sha256_hash": sha256}
    r = requests.post(API_URL, data=payload, timeout=timeout)
    r.raise_for_status()

    # API returns raw file content for get_file
    out_path = os.path.join(out_dir, f"{sha256}.zip")
    with open(out_path, "wb") as f:
        f.write(r.content)
    return out_path


def main() -> int:
    ap = argparse.ArgumentParser(description="MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)")
    sub = ap.add_subparsers(dest="cmd", required=True)

    # hash
    p_hash = sub.add_parser("hash", help="Query by hash (sha256/md5/sha1)")
    p_hash.add_argument("--value", required=True, help="Hash value to search")
    p_hash.add_argument("--out", default="out/mb_hash.json", help="Output JSON path")
    p_hash.add_argument("--csv", default="", help="Optional CSV output path")
    p_hash.add_argument("--download", action="store_true", help="(LAB ONLY) Download sample zip (OFF by default)")
    p_hash.add_argument("--download-dir", default="out/downloads", help="Download directory (when --download)")

    # recent
    p_recent = sub.add_parser("recent", help="Get recent samples")
    p_recent.add_argument("--limit", type=int, default=50, help="Number of recent items (practical limit applies)")
    p_recent.add_argument("--out", default="out/mb_recent.json", help="Output JSON path")
    p_recent.add_argument("--csv", default="", help="Optional CSV output path")

    # tag
    p_tag = sub.add_parser("tag", help="Search by tag (e.g. 'stealer', 'ransomware')")
    p_tag.add_argument("--value", required=True, help="Tag to search")
    p_tag.add_argument("--limit", type=int, default=50, help="Limit (best-effort)")
    p_tag.add_argument("--out", default="out/mb_tag.json", help="Output JSON path")
    p_tag.add_argument("--csv", default="", help="Optional CSV output path")

    # signature
    p_sig = sub.add_parser("signature", help="Search by signature (family)")
    p_sig.add_argument("--value", required=True, help="Signature/family to search")
    p_sig.add_argument("--limit", type=int, default=50, help="Limit (best-effort)")
    p_sig.add_argument("--out", default="out/mb_signature.json", help="Output JSON path")
    p_sig.add_argument("--csv", default="", help="Optional CSV output path")

    args = ap.parse_args()

    # Run
    if args.cmd == "hash":
        resp = post_api({"query": "get_info", "hash": args.value})
        rows = normalize_rows(resp)
        write_json(args.out, resp)
        if args.csv:
            write_csv(args.csv, rows)

        # Optional lab-only download: choose sha256 from response if available
        if args.download:
            if not rows:
                raise SystemExit("No data returned; cannot download.")
            sha256 = rows[0].get("sha256_hash")
            if not sha256:
                raise SystemExit("No sha256_hash in response; cannot download.")
            path = download_sample(sha256, args.download_dir)
            print(f"[CYBERDUDEBIVASH] Downloaded sample ZIP to: {path}")

        print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}")
        if args.csv:
            print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}")
        return 0

    if args.cmd == "recent":
        resp = post_api({"query": "get_recent", "selector": str(args.limit)})
        rows = normalize_rows(resp)
        write_json(args.out, resp)
        if args.csv:
            write_csv(args.csv, rows)
        print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}")
        if args.csv:
            print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}")
        return 0

    if args.cmd == "tag":
        # MalwareBazaar supports: query=get_taginfo, tag=...
        resp = post_api({"query": "get_taginfo", "tag": args.value})
        rows = normalize_rows(resp)
        # best-effort limit client-side
        if rows and args.limit:
            rows = rows[: args.limit]
            resp = {"query_status": "ok", "data": rows, "note": "client_side_limit_applied"}
        write_json(args.out, resp)
        if args.csv:
            write_csv(args.csv, rows)
        print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}")
        if args.csv:
            print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}")
        return 0

    if args.cmd == "signature":
        # MalwareBazaar supports: query=get_siginfo, signature=...
        resp = post_api({"query": "get_siginfo", "signature": args.value})
        rows = normalize_rows(resp)
        if rows and args.limit:
            rows = rows[: args.limit]
            resp = {"query_status": "ok", "data": rows, "note": "client_side_limit_applied"}
        write_json(args.out, resp)
        if args.csv:
            write_csv(args.csv, rows)
        print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}")
        if args.csv:
            print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}")
        return 0

    return 2


if __name__ == "__main__":
    raise SystemExit(main())


Real-time usage examples 

1) Hash enrichment (SOC IOC triage)

python malbaz_quickstart.py hash --value <SHA256_OR_MD5_OR_SHA1> --out out/hash.json --csv out/hash.csv

2) Pull latest samples for daily hunting

python malbaz_quickstart.py recent --limit 100 --out out/recent.json --csv out/recent.csv

3) Hunt by tag (e.g., “stealer”, “ransomware”)

python malbaz_quickstart.py tag --value stealer --limit 50 --out out/tag_stealer.json

4) Search by malware family/signature

python malbaz_quickstart.py signature --value "AgentTesla" --limit 50 --out out/agenttesla.json

5) (LAB ONLY) Download sample zip for isolated sandbox

python malbaz_quickstart.py hash --value <SHA256> --download --download-dir out/downloads

SOC best-practice notes (CYBERDUDEBIVASH authority)

  • Use this script for metadata enrichment + IOC pipeline, not “auto-detonation.”

  • Keep downloads OFF by default and only enable in a sandbox environment.

  • Ship JSON/CSV to SIEM, then correlate with:

    • endpoint process telemetry

    • DNS / proxy logs

    • authentication anomalies

    • email/phishing events



#cyberdudebivash #CyberDudeBivash #MalwareBazaar #ThreatIntel #MalwareAnalysis #SOC #ThreatHunting #DFIR #IncidentResponse #DetectionEngineering #IOC #YARA #ReverseEngineering #SecurityAutomation #PythonSecurity #SIEM #Splunk #Elastic #CyberDefense #CyberSecurity