Siap. Saya lanjutkan ke versi yang paling aman dan paling praktis untuk kebutuhan Anda.

Target versi ini

Script ini saya desain supaya:

  • CLI: input dari file, tidak edit script
  • hasil langsung ditulis ke file per domain
  • kalau Ctrl+C / putus / error, hasil yang sudah lewat tetap aman
  • ada resume
  • ada summary
  • ada pisah hasil:
    • hidup
    • warning
    • offline
  • bisa mendeteksi:
    • LIVE 200
    • DEFAULT HOSTING PAGE
    • PARKED
    • FOR SALE
    • COMING SOON
    • SUSPENDED
    • EXPIRED / RENEWAL ISSUE
    • SERVER ERROR
    • FORBIDDEN
    • NOT FOUND
    • SSL invalid
    • HTTP only
    • redirect
  • lebih aman dari overload karena:
    • default single-thread / sequential
    • timeout jelas
    • body response dibatasi
    • retry hanya untuk error koneksi ringan
  • ada file:
    • all_results.csv
    • hidup_results.csv
    • live_ok_results.csv
    • warning_results.csv
    • offline_results.csv
    • hidup_domains.txt
    • offline_domains.txt
    • warning_domains.txt
    • summary.txt
    • progress.log
    • current_domain.txt

Install dependency

pip install requests dnspython beautifulsoup4

Script versi terbaik

Simpan sebagai:

domain_status_cli.py

Cara pakai

1. Siapkan file input

Misalnya domains.txt

trazam.com
deudex.com
4bx-cd.com
j3kram.com
edzzup.com
usaiptv.tv

2. Jalankan

python3 domain_status_cli.py -i domains.txt -o hasil

3. Kalau ingin lebih aman/stabil untuk list besar

Saya sarankan:

python3 domain_status_cli.py -i domains.txt -o hasil --connect-timeout 6 --read-timeout 8 --dns-timeout 3 --delay 0.2

4. Mulai ulang dari nol

python3 domain_status_cli.py -i domains.txt -o hasil --fresh

5. Resume otomatis

Kalau script pernah jalan, lalu berhenti, jalankan lagi:

python3 domain_status_cli.py -i domains.txt -o hasil

Dia akan baca all_results.csv dan skip yang sudah pernah diproses.


Arti file output

all_results.csv

Semua hasil.

hidup_results.csv

Semua domain yang masih reachable:

  • normal
  • warning
  • suspended
  • renewal issue
  • 403
  • 404
  • 503
  • parked
  • default page
  • dll

live_ok_results.csv

Hanya website yang benar-benar normal/live:

  • LIVE 200
  • LIVE NON-HTML

warning_results.csv

Domain masih hidup/reachable, tapi ada masalah:

  • DEFAULT HOSTING PAGE
  • PARKED
  • FOR SALE
  • SUSPENDED
  • EXPIRED / RENEWAL ISSUE
  • SERVER ERROR
  • FORBIDDEN
  • NOT FOUND
  • dll

offline_results.csv

Yang benar-benar gagal:

  • no DNS
  • timeout
  • connection failed
  • request error

summary.txt

Ringkasan total, langsung diperbarui selama proses jalan.

current_domain.txt

Domain yang sedang diproses saat ini.

progress.log

Log proses.


Kenapa versi ini lebih baik dari sebelumnya

1. Aman kalau proses stop

Setelah 1 domain selesai:

  • langsung tulis CSV
  • langsung flush
  • langsung fsync

Jadi kalau:

  • Ctrl+C
  • terminal putus
  • server restart
  • process crash

hasil sebelumnya tetap ada.


2. Lebih kecil risiko hang

Saya tambahkan:

  • connect timeout
  • read timeout
  • DNS timeout
  • batas body response
  • retry ringan
  • sequential

Jadi lebih aman daripada script yang baca response besar tanpa batas.


3. Bisa bedakan "hidup" dan "hidup tapi bermasalah"

Ini penting.

Contoh:

  • 200 belum tentu normal
  • 503 belum tentu mati
  • 403 belum tentu offline
  • default hosting page bukan website normal
  • suspended / expired juga domain masih merespons

Catatan penting

Kalau proses dihentikan dengan:

kill -9 PID

maka tidak ada script Python yang bisa trap itu.
Tapi hasil domain yang sudah selesai ditulis sebelumnya tetap aman.


Rekomendasi command untuk pemakaian nyata

Kalau domain banyak:

python3 domain_status_cli.py -i domains.txt -o hasil --connect-timeout 6 --read-timeout 8 --dns-timeout 3 --delay 0.1

Kalau server kecil / takut overload:

python3 domain_status_cli.py -i domains.txt -o hasil --connect-timeout 5 --read-timeout 7 --dns-timeout 2 --delay 0.3

Kalau Anda mau lebih lanjut

Saya bisa bantu buat versi final yang lebih "premium" lagi, misalnya tambah:

  1. cek nameserver aktif / registrar parking pattern
  2. cek apakah redirect ke domain lain
  3. cek meta robots / canonical
  4. export XLSX
  5. multi-thread tapi tetap aman dengan writer terpisah
  6. progress bar
  7. filter input sangat besar
  8. mode scan khusus expired/suspended

Kalau mau, langkah berikutnya saya sarankan:
saya buatkan versi v2 dengan kolom yang lebih mirip HeadMasterSEO + summary statistik per kategori yang lebih detail.

import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import time
import warnings
from collections import Counter
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse

import dns.resolver
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.retry import Retry

warnings.simplefilter("ignore", InsecureRequestWarning)

# =========================
# CONFIG / CONSTANTS
# =========================

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0 Safari/537.36"
    ),
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,text/plain;q=0.8,*/*;q=0.7",
}

DEFAULT_CONNECT_TIMEOUT = 8
DEFAULT_READ_TIMEOUT = 10
DEFAULT_DNS_TIMEOUT = 3
DEFAULT_MAX_REDIRECTS = 10
DEFAULT_MAX_BYTES = 262144  # 256 KB
DEFAULT_RETRIES = 1

FOR_SALE_KEYWORDS = [
    "domain is for sale",
    "buy this domain",
    "this domain may be for sale",
    "purchase this domain",
    "afternic",
    "sedo",
    "dan.com",
    "undeveloped",
]

PARKED_KEYWORDS = [
    "domain parked",
    "parked free",
    "parkingcrew",
    "bodis",
    "cashparking",
    "sedo parking",
    "parked domain",
    "this domain is parked",
]

DEFAULT_HOSTING_KEYWORDS = [
    "apache2 ubuntu default page",
    "apache2 debian default page",
    "welcome to nginx",
    "nginx test page",
    "test page for the nginx",
    "default web site page",
    "iis windows server",
]

COMING_SOON_KEYWORDS = [
    "coming soon",
    "under construction",
    "launching soon",
    "website coming soon",
    "site is coming soon",
]

SUSPENDED_KEYWORDS = [
    "this account has been suspended",
    "account suspended",
    "website suspended",
    "site suspended",
    "hosting account has been suspended",
    "suspended due to non-payment",
    "please contact billing",
    "contact your hosting provider",
    "billing issue",
]

EXPIRED_KEYWORDS = [
    "this domain has expired",
    "domain expired",
    "expired domain",
    "renew this domain",
    "renewal required",
    "domain renewal",
    "renew now",
    "expiration notice",
    "registrant verification failed",
    "has expired and may be available",
]

FIELDNAMES = [
    "checked_at",
    "domain",
    "bucket",
    "page_type",
    "dns_ok",
    "dns_error",
    "status_code",
    "ssl_status",
    "content_type",
    "elapsed_ms",
    "best_start_url",
    "best_final_url",
    "title",
    "notes",
    "error",
    "redirect_chain",
    "A",
    "AAAA",
    "CNAME",
    "MX",
    "NS",
    "all_attempts",
]

STOP_REQUESTED = False
SIGNAL_COUNT = 0

# ANSI color
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
C_DIM = "\033[2m"


# =========================
# SAFE FILE WRITERS
# =========================

def sync_file(f):
    f.flush()
    os.fsync(f.fileno())


def atomic_write_text(path, text):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(text)
            sync_file(f)
        os.replace(tmp, path)
    finally:
        try:
            if os.path.exists(tmp):
                os.remove(tmp)
        except Exception:
            pass


class SafeCsvWriter:
    def __init__(self, path, fieldnames):
        self.path = path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        file_exists = os.path.exists(path) and os.path.getsize(path) > 0
        self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
        self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
        if not file_exists:
            self.writer.writeheader()
            sync_file(self.f)

    def writerow(self, row):
        self.writer.writerow(row)
        sync_file(self.f)

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


class SafeLineWriter:
    def __init__(self, path):
        self.path = path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        self.f = open(path, "a", encoding="utf-8", buffering=1)

    def write_line(self, text):
        self.f.write(text.rstrip("\n") + "\n")
        sync_file(self.f)

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


# =========================
# UTIL
# =========================

def now_utc():
    return datetime.now(timezone.utc).isoformat()


def remove_if_exists(path):
    try:
        if os.path.exists(path):
            os.remove(path)
    except Exception:
        pass


def write_current_domain(path, domain):
    atomic_write_text(path, domain.strip() + "\n")


def clear_current_domain(path):
    atomic_write_text(path, "")


def clean_text(s):
    if not s:
        return ""
    return re.sub(r"\s+", " ", s).strip()


def normalize_domain(raw):
    s = raw.strip()
    if not s or s.startswith("#"):
        return ""

    s = s.split("#", 1)[0].strip()
    if not s:
        return ""

    if "://" not in s:
        s = "http://" + s

    try:
        p = urlparse(s)
        host = p.netloc or p.path
        host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
        if host.startswith("www."):
            host = host[4:]
        return host
    except Exception:
        return ""


def load_domains(input_file):
    domains = []
    seen = set()

    with open(input_file, "r", encoding="utf-8") as f:
        for line in f:
            d = normalize_domain(line)
            if d and d not in seen:
                seen.add(d)
                domains.append(d)

    return domains


def load_processed_domains(all_results_csv):
    processed = set()
    if not os.path.exists(all_results_csv) or os.path.getsize(all_results_csv) == 0:
        return processed

    try:
        with open(all_results_csv, "r", encoding="utf-8", newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                domain = (row.get("domain") or "").strip().lower()
                if domain:
                    processed.add(domain)
    except Exception:
        pass

    return processed


def host_of(url):
    try:
        return (urlparse(url).hostname or "").lower()
    except Exception:
        return ""


def is_html_like(content_type):
    ct = (content_type or "").lower()
    return any(x in ct for x in [
        "text/html",
        "application/xhtml+xml",
        "text/plain",
        "application/xml",
        "text/xml",
    ])


def extract_title(html):
    if not html:
        return ""

    try:
        soup = BeautifulSoup(html, "html.parser")
        if soup.title and soup.title.string:
            return clean_text(soup.title.string)
    except Exception:
        pass

    m = re.search(r"<title[^>]*>(.*?)</title>", html, re.I | re.S)
    if m:
        return clean_text(m.group(1))
    return ""


def colorize_bucket(bucket, text):
    if bucket == "HIDUP":
        return f"{C_GREEN}{text}{C_RESET}"
    if bucket == "WARNING":
        return f"{C_YELLOW}{text}{C_RESET}"
    return f"{C_RED}{text}{C_RESET}"


# =========================
# SIGNAL HANDLING
# =========================

def signal_handler(signum, frame):
    global STOP_REQUESTED, SIGNAL_COUNT
    SIGNAL_COUNT += 1

    if SIGNAL_COUNT == 1:
        STOP_REQUESTED = True
        print(
            f"\n{C_YELLOW}Signal diterima. Script akan berhenti setelah domain saat ini selesai.{C_RESET}\n"
            f"{C_DIM}Tekan Ctrl+C sekali lagi untuk paksa berhenti sekarang.{C_RESET}"
        )
    else:
        raise KeyboardInterrupt


# =========================
# DNS
# =========================

def get_dns_info(domain, dns_timeout):
    result = {
        "dns_ok": False,
        "A": [],
        "AAAA": [],
        "CNAME": [],
        "MX": [],
        "NS": [],
        "dns_error": "",
    }

    resolver = dns.resolver.Resolver()
    resolver.timeout = dns_timeout
    resolver.lifetime = dns_timeout

    for rtype in ["A", "AAAA", "CNAME", "MX", "NS"]:
        try:
            answers = resolver.resolve(domain, rtype)
            values = []
            for r in answers:
                if rtype == "MX":
                    values.append(str(r.exchange).rstrip("."))
                elif hasattr(r, "target"):
                    values.append(str(r.target).rstrip("."))
                else:
                    values.append(str(r).rstrip("."))
            result[rtype] = values
        except dns.resolver.NXDOMAIN:
            result["dns_error"] = "NXDOMAIN"
            return result
        except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
            pass
        except Exception as e:
            if not result["dns_error"]:
                result["dns_error"] = type(e).__name__

    if any(result[k] for k in ["A", "AAAA", "CNAME", "MX", "NS"]):
        result["dns_ok"] = True
    elif not result["dns_error"]:
        result["dns_error"] = "NO_RECORDS"

    return result


# =========================
# HTTP / PROBE
# =========================

def build_session(retries):
    session = requests.Session()

    retry_cfg = Retry(
        total=retries,
        connect=retries,
        read=0,
        redirect=0,
        status=0,
        backoff_factor=0.3,
        allowed_methods=frozenset(["GET"]),
        raise_on_status=False,
    )

    adapter = HTTPAdapter(max_retries=retry_cfg, pool_connections=10, pool_maxsize=10)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session


def read_limited_text(resp, max_bytes):
    chunks = []
    total = 0
    try:
        for chunk in resp.iter_content(chunk_size=8192, decode_unicode=False):
            if not chunk:
                continue
            remain = max_bytes - total
            if remain <= 0:
                break
            if len(chunk) > remain:
                chunk = chunk[:remain]
            chunks.append(chunk)
            total += len(chunk)
            if total >= max_bytes:
                break
    except Exception:
        pass

    raw = b"".join(chunks)
    enc = resp.encoding or "utf-8"
    try:
        return raw.decode(enc, errors="replace")
    except Exception:
        return raw.decode("utf-8", errors="replace")


def classify_page(status_code, title, body, content_type):
    blob = ((title or "") + "\n" + (body or "")[:8000]).lower()

    if any(k in blob for k in EXPIRED_KEYWORDS):
        return "EXPIRED / RENEWAL ISSUE"

    if any(k in blob for k in SUSPENDED_KEYWORDS):
        return "SUSPENDED"

    if any(k in blob for k in FOR_SALE_KEYWORDS):
        return "FOR SALE"

    if any(k in blob for k in PARKED_KEYWORDS):
        return "PARKED"

    if any(k in blob for k in DEFAULT_HOSTING_KEYWORDS):
        return "DEFAULT HOSTING PAGE"

    if any(k in blob for k in COMING_SOON_KEYWORDS):
        return "COMING SOON"

    if 200 <= status_code <= 299:
        if content_type and not is_html_like(content_type):
            return "LIVE NON-HTML"
        return "LIVE 200"

    if status_code in (301, 302, 303, 307, 308):
        return "REDIRECT"

    if status_code == 401:
        return "UNAUTHORIZED"
    if status_code == 403:
        return "FORBIDDEN"
    if status_code == 404:
        return "NOT FOUND"
    if status_code == 410:
        return "GONE"
    if status_code == 429:
        return "RATE LIMITED"
    if 500 <= status_code <= 599:
        return "SERVER ERROR"

    return f"HTTP {status_code}"


def score_result(r):
    if not r["ok"]:
        if r["page_type"] == "TOO MANY REDIRECTS" and r["chain"]:
            return 70
        return 0

    pt = r["page_type"]
    sc = r["status_code"]

    if pt == "LIVE 200":
        return 100
    if pt == "LIVE NON-HTML":
        return 98
    if pt in [
        "EXPIRED / RENEWAL ISSUE",
        "SUSPENDED",
        "FOR SALE",
        "PARKED",
        "DEFAULT HOSTING PAGE",
        "COMING SOON",
    ]:
        return 95
    if sc == 403:
        return 85
    if sc == 401:
        return 84
    if 500 <= sc <= 599:
        return 82
    if sc in (404, 410):
        return 80
    if 300 <= sc < 400:
        return 75
    return 10


def probe_url(session, start_url, connect_timeout, read_timeout, max_redirects, max_bytes):
    current_url = start_url
    chain = []
    ssl_status = "N/A"

    for _ in range(max_redirects):
        resp = None

        try:
            resp = session.get(
                current_url,
                headers=HEADERS,
                timeout=(connect_timeout, read_timeout),
                allow_redirects=False,
                verify=True,
                stream=True,
            )
            if current_url.startswith("https://"):
                ssl_status = "VALID"

        except requests.exceptions.SSLError:
            ssl_status = "INVALID"
            try:
                resp = session.get(
                    current_url,
                    headers=HEADERS,
                    timeout=(connect_timeout, read_timeout),
                    allow_redirects=False,
                    verify=False,
                    stream=True,
                )
            except requests.exceptions.Timeout:
                return {
                    "ok": False,
                    "start_url": start_url,
                    "final_url": current_url,
                    "status_code": "",
                    "page_type": "TIMEOUT",
                    "title": "",
                    "chain": " | ".join(chain),
                    "ssl_status": ssl_status,
                    "content_type": "",
                    "elapsed_ms": "",
                    "error": "Timeout",
                }
            except requests.exceptions.ConnectionError:
                return {
                    "ok": False,
                    "start_url": start_url,
                    "final_url": current_url,
                    "status_code": "",
                    "page_type": "CONNECTION FAILED",
                    "title": "",
                    "chain": " | ".join(chain),
                    "ssl_status": ssl_status,
                    "content_type": "",
                    "elapsed_ms": "",
                    "error": "ConnectionError",
                }
            except Exception as e:
                return {
                    "ok": False,
                    "start_url": start_url,
                    "final_url": current_url,
                    "status_code": "",
                    "page_type": "SSL ERROR",
                    "title": "",
                    "chain": " | ".join(chain),
                    "ssl_status": ssl_status,
                    "content_type": "",
                    "elapsed_ms": "",
                    "error": type(e).__name__,
                }

        except requests.exceptions.Timeout:
            return {
                "ok": False,
                "start_url": start_url,
                "final_url": current_url,
                "status_code": "",
                "page_type": "TIMEOUT",
                "title": "",
                "chain": " | ".join(chain),
                "ssl_status": ssl_status,
                "content_type": "",
                "elapsed_ms": "",
                "error": "Timeout",
            }

        except requests.exceptions.ConnectionError:
            return {
                "ok": False,
                "start_url": start_url,
                "final_url": current_url,
                "status_code": "",
                "page_type": "CONNECTION FAILED",
                "title": "",
                "chain": " | ".join(chain),
                "ssl_status": ssl_status,
                "content_type": "",
                "elapsed_ms": "",
                "error": "ConnectionError",
            }

        except Exception as e:
            return {
                "ok": False,
                "start_url": start_url,
                "final_url": current_url,
                "status_code": "",
                "page_type": "REQUEST ERROR",
                "title": "",
                "chain": " | ".join(chain),
                "ssl_status": ssl_status,
                "content_type": "",
                "elapsed_ms": "",
                "error": type(e).__name__,
            }

        try:
            elapsed_ms = int(resp.elapsed.total_seconds() * 1000)
        except Exception:
            elapsed_ms = ""

        chain.append(f"{resp.status_code} {current_url}")

        if 300 <= resp.status_code < 400 and resp.headers.get("Location"):
            next_url = urljoin(current_url, resp.headers.get("Location"))
            try:
                resp.close()
            except Exception:
                pass
            current_url = next_url
            continue

        content_type = resp.headers.get("Content-Type", "")
        body = ""
        if resp.status_code not in (204, 304):
            body = read_limited_text(resp, max_bytes)

        title = extract_title(body)
        page_type = classify_page(resp.status_code, title, body, content_type)

        final_url = resp.url
        try:
            resp.close()
        except Exception:
            pass

        return {
            "ok": True,
            "start_url": start_url,
            "final_url": final_url,
            "status_code": resp.status_code,
            "page_type": page_type,
            "title": title,
            "chain": " | ".join(chain),
            "ssl_status": ssl_status,
            "content_type": content_type,
            "elapsed_ms": elapsed_ms,
            "error": "",
        }

    return {
        "ok": False,
        "start_url": start_url,
        "final_url": current_url,
        "status_code": "",
        "page_type": "TOO MANY REDIRECTS",
        "title": "",
        "chain": " | ".join(chain),
        "ssl_status": ssl_status,
        "content_type": "",
        "elapsed_ms": "",
        "error": "TooManyRedirects",
    }


def best_probe(domain, connect_timeout, read_timeout, max_redirects, max_bytes, retries):
    candidates = [
        f"https://{domain}",
        f"https://www.{domain}",
        f"http://{domain}",
        f"http://www.{domain}",
    ]

    results = []
    with build_session(retries) as session:
        for url in candidates:
            r = probe_url(
                session=session,
                start_url=url,
                connect_timeout=connect_timeout,
                read_timeout=read_timeout,
                max_redirects=max_redirects,
                max_bytes=max_bytes,
            )
            results.append(r)

    best = max(results, key=score_result)
    return results, best


# =========================
# CLASSIFICATION / NOTES
# =========================

def classify_bucket(best):
    pt = best["page_type"]
    sc = best["status_code"]

    if pt in ["LIVE 200", "LIVE NON-HTML"]:
        return "HIDUP"

    if sc != "":
        return "WARNING"

    if pt == "TOO MANY REDIRECTS" and best["chain"]:
        return "WARNING"

    return "OFFLINE"


def build_notes(domain, dns_info, best):
    notes = []

    if not dns_info["dns_ok"]:
        notes.append("DNS problem")

    if best["ssl_status"] == "INVALID":
        notes.append("SSL invalid")

    final_url = best["final_url"] or ""
    start_url = best["start_url"] or ""
    final_host = host_of(final_url)

    if start_url and final_url and start_url != final_url:
        notes.append("Redirected")

    if final_url.startswith("http://"):
        notes.append("HTTP only")

    if final_host and final_host not in {domain, f"www.{domain}"}:
        notes.append(f"Redirect to other host: {final_host}")

    if best["page_type"] == "DEFAULT HOSTING PAGE":
        notes.append("Server default page")
    elif best["page_type"] == "PARKED":
        notes.append("Parked domain")
    elif best["page_type"] == "FOR SALE":
        notes.append("Domain for sale")
    elif best["page_type"] == "COMING SOON":
        notes.append("Coming soon page")
    elif best["page_type"] == "SUSPENDED":
        notes.append("Suspended; check hosting/billing")
    elif best["page_type"] == "EXPIRED / RENEWAL ISSUE":
        notes.append("Expired/renewal issue")
    elif best["page_type"] == "SERVER ERROR":
        notes.append("Website reachable but server error")
    elif best["page_type"] == "NOT FOUND":
        notes.append("Host reachable but page not found")
    elif best["page_type"] == "FORBIDDEN":
        notes.append("Host reachable but blocked/forbidden")
    elif best["page_type"] == "CONNECTION FAILED":
        notes.append("Cannot connect to web server")
    elif best["page_type"] == "TIMEOUT":
        notes.append("Request timeout")

    ct = (best["content_type"] or "").strip()
    if ct and not is_html_like(ct):
        notes.append(f"Non-HTML content: {ct}")

    return "; ".join(notes)


def summarize_domain(domain, connect_timeout, read_timeout, dns_timeout, max_redirects, max_bytes, retries):
    dns_info = get_dns_info(domain, dns_timeout)
    attempts, best = best_probe(
        domain=domain,
        connect_timeout=connect_timeout,
        read_timeout=read_timeout,
        max_redirects=max_redirects,
        max_bytes=max_bytes,
        retries=retries,
    )

    bucket = classify_bucket(best)
    notes = build_notes(domain, dns_info, best)

    row = {
        "checked_at": now_utc(),
        "domain": domain,
        "bucket": bucket,
        "page_type": best["page_type"],
        "dns_ok": dns_info["dns_ok"],
        "dns_error": dns_info["dns_error"],
        "status_code": best["status_code"],
        "ssl_status": best["ssl_status"],
        "content_type": best["content_type"],
        "elapsed_ms": best["elapsed_ms"],
        "best_start_url": best["start_url"],
        "best_final_url": best["final_url"],
        "title": best["title"],
        "notes": notes,
        "error": best["error"],
        "redirect_chain": best["chain"],
        "A": ", ".join(dns_info["A"]),
        "AAAA": ", ".join(dns_info["AAAA"]),
        "CNAME": ", ".join(dns_info["CNAME"]),
        "MX": ", ".join(dns_info["MX"]),
        "NS": ", ".join(dns_info["NS"]),
        "all_attempts": " || ".join(
            f'{r["start_url"]} => {r["page_type"]} ({r["status_code"]}) -> {r["final_url"]}'
            for r in attempts
        ),
    }

    return row


def fallback_error_row(domain, err_msg):
    return {
        "checked_at": now_utc(),
        "domain": domain,
        "bucket": "OFFLINE",
        "page_type": "SCRIPT ERROR",
        "dns_ok": "",
        "dns_error": "",
        "status_code": "",
        "ssl_status": "",
        "content_type": "",
        "elapsed_ms": "",
        "best_start_url": "",
        "best_final_url": "",
        "title": "",
        "notes": "Internal script error",
        "error": err_msg,
        "redirect_chain": "",
        "A": "",
        "AAAA": "",
        "CNAME": "",
        "MX": "",
        "NS": "",
        "all_attempts": "",
    }


# =========================
# SUMMARY
# =========================

def update_summary(path, total_input, processed_now, skipped_resume, counts_bucket, counts_type, current_domain):
    lines = []
    lines.append("DOMAIN STATUS CHECKER SUMMARY")
    lines.append("=" * 40)
    lines.append(f"generated_at     : {now_utc()}")
    lines.append(f"total_input      : {total_input}")
    lines.append(f"processed_now    : {processed_now}")
    lines.append(f"skipped_resume   : {skipped_resume}")
    lines.append(f"remaining_est    : {max(total_input - processed_now - skipped_resume, 0)}")
    lines.append(f"current_domain   : {current_domain or '-'}")
    lines.append("")
    lines.append("BUCKET COUNTS")
    lines.append("-" * 40)
    for key in ["HIDUP", "WARNING", "OFFLINE"]:
        lines.append(f"{key:15}: {counts_bucket.get(key, 0)}")

    lines.append("")
    lines.append("PAGE TYPE COUNTS")
    lines.append("-" * 40)
    for k, v in counts_type.most_common():
        lines.append(f"{k:30}: {v}")

    lines.append("")
    lines.append("KETERANGAN")
    lines.append("-" * 40)
    lines.append("HIDUP    = website normal/live")
    lines.append("WARNING  = domain reachable tapi bermasalah / bukan website normal")
    lines.append("OFFLINE  = tidak reachable / DNS gagal / timeout / connection failed")

    atomic_write_text(path, "\n".join(lines) + "\n")


# =========================
# CLI
# =========================

def parse_args():
    parser = argparse.ArgumentParser(
        description="CLI checker status domain yang lebih mirip HeadMasterSEO-style"
    )
    parser.add_argument("-i", "--input", required=True, help="File input domain (.txt)")
    parser.add_argument("-o", "--output", default="results", help="Folder output")
    parser.add_argument("--connect-timeout", type=int, default=DEFAULT_CONNECT_TIMEOUT, help="Connect timeout (detik)")
    parser.add_argument("--read-timeout", type=int, default=DEFAULT_READ_TIMEOUT, help="Read timeout (detik)")
    parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout (detik)")
    parser.add_argument("--max-redirects", type=int, default=DEFAULT_MAX_REDIRECTS, help="Batas redirect")
    parser.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES, help="Maksimum body yang dibaca")
    parser.add_argument("--retries", type=int, default=DEFAULT_RETRIES, help="Retry koneksi ringan")
    parser.add_argument("--delay", type=float, default=0.0, help="Delay antar domain (detik)")
    parser.add_argument("--fresh", action="store_true", help="Hapus output lama dan mulai dari awal")
    parser.add_argument("--no-resume", action="store_true", help="Jangan skip domain yang sudah diproses")
    return parser.parse_args()


def main():
    global STOP_REQUESTED

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    args = parse_args()
    os.makedirs(args.output, exist_ok=True)

    all_csv = os.path.join(args.output, "all_results.csv")
    hidup_csv = os.path.join(args.output, "hidup_results.csv")          # HIDUP + WARNING (semua yang reachable)
    live_ok_csv = os.path.join(args.output, "live_ok_results.csv")      # HIDUP only
    warning_csv = os.path.join(args.output, "warning_results.csv")      # WARNING only
    offline_csv = os.path.join(args.output, "offline_results.csv")      # OFFLINE only

    hidup_txt = os.path.join(args.output, "hidup_domains.txt")
    live_ok_txt = os.path.join(args.output, "live_ok_domains.txt")
    warning_txt = os.path.join(args.output, "warning_domains.txt")
    offline_txt = os.path.join(args.output, "offline_domains.txt")

    summary_txt = os.path.join(args.output, "summary.txt")
    progress_log = os.path.join(args.output, "progress.log")
    current_file = os.path.join(args.output, "current_domain.txt")

    if args.fresh:
        for p in [
            all_csv, hidup_csv, live_ok_csv, warning_csv, offline_csv,
            hidup_txt, live_ok_txt, warning_txt, offline_txt,
            summary_txt, progress_log, current_file
        ]:
            remove_if_exists(p)

    domains = load_domains(args.input)
    if not domains:
        print("Tidak ada domain valid di file input.")
        sys.exit(1)

    processed = set()
    if not args.no_resume:
        processed = load_processed_domains(all_csv)

    queue = [d for d in domains if d not in processed]

    all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
    hidup_writer = SafeCsvWriter(hidup_csv, FIELDNAMES)
    live_ok_writer = SafeCsvWriter(live_ok_csv, FIELDNAMES)
    warning_writer = SafeCsvWriter(warning_csv, FIELDNAMES)
    offline_writer = SafeCsvWriter(offline_csv, FIELDNAMES)

    hidup_list_writer = SafeLineWriter(hidup_txt)
    live_ok_list_writer = SafeLineWriter(live_ok_txt)
    warning_list_writer = SafeLineWriter(warning_txt)
    offline_list_writer = SafeLineWriter(offline_txt)
    log_writer = SafeLineWriter(progress_log)

    counts_bucket = Counter()
    counts_type = Counter()
    processed_now = 0

    print(f"{C_CYAN}Total input    : {len(domains)}{C_RESET}")
    print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
    print(f"{C_CYAN}Akan diproses  : {len(queue)}{C_RESET}")
    print(f"{C_CYAN}Output folder  : {args.output}{C_RESET}")
    print("-" * 110)

    log_writer.write_line(
        f"RUN START {now_utc()} total_input={len(domains)} skipped_resume={len(processed)} pending={len(queue)}"
    )

    update_summary(
        path=summary_txt,
        total_input=len(domains),
        processed_now=processed_now,
        skipped_resume=len(processed),
        counts_bucket=counts_bucket,
        counts_type=counts_type,
        current_domain="",
    )

    try:
        for idx, domain in enumerate(queue, 1):
            if STOP_REQUESTED:
                break

            write_current_domain(current_file, domain)
            log_writer.write_line(f"START {now_utc()} {domain}")

            try:
                row = summarize_domain(
                    domain=domain,
                    connect_timeout=args.connect_timeout,
                    read_timeout=args.read_timeout,
                    dns_timeout=args.dns_timeout,
                    max_redirects=args.max_redirects,
                    max_bytes=args.max_bytes,
                    retries=args.retries,
                )
            except KeyboardInterrupt:
                raise
            except Exception as e:
                row = fallback_error_row(domain, f"{type(e).__name__}: {e}")

            # Always write all first
            all_writer.writerow(row)

            # split
            if row["bucket"] in ("HIDUP", "WARNING"):
                hidup_writer.writerow(row)
                hidup_list_writer.write_line(domain)

            if row["bucket"] == "HIDUP":
                live_ok_writer.writerow(row)
                live_ok_list_writer.write_line(domain)
            elif row["bucket"] == "WARNING":
                warning_writer.writerow(row)
                warning_list_writer.write_line(domain)
            else:
                offline_writer.writerow(row)
                offline_list_writer.write_line(domain)

            counts_bucket[row["bucket"]] += 1
            counts_type[row["page_type"]] += 1
            processed_now += 1

            update_summary(
                path=summary_txt,
                total_input=len(domains),
                processed_now=processed_now,
                skipped_resume=len(processed),
                counts_bucket=counts_bucket,
                counts_type=counts_type,
                current_domain=domain,
            )

            bucket_colored = colorize_bucket(row["bucket"], row["bucket"])
            code_text = str(row["status_code"]) if row["status_code"] != "" else "-"
            print(
                f"[{idx}/{len(queue)}] "
                f"{domain:30} -> {bucket_colored:20} | "
                f"{row['page_type'][:28]:28} | "
                f"{code_text:4} | "
                f"{str(row['elapsed_ms'])[:6]:6} ms | "
                f"{row['best_final_url'][:45]}"
            )

            log_writer.write_line(
                f"DONE {now_utc()} {domain} "
                f"bucket={row['bucket']} type={row['page_type']} code={row['status_code']}"
            )

            clear_current_domain(current_file)

            if args.delay > 0 and not STOP_REQUESTED:
                time.sleep(args.delay)

        update_summary(
            path=summary_txt,
            total_input=len(domains),
            processed_now=processed_now,
            skipped_resume=len(processed),
            counts_bucket=counts_bucket,
            counts_type=counts_type,
            current_domain="",
        )

        print("-" * 110)
        if STOP_REQUESTED:
            print(f"{C_YELLOW}Berhenti secara graceful.{C_RESET}")
        else:
            print(f"{C_GREEN}Selesai.{C_RESET}")

        print(f"All CSV       : {all_csv}")
        print(f"Hidup CSV     : {hidup_csv}")
        print(f"Live OK CSV   : {live_ok_csv}")
        print(f"Warning CSV   : {warning_csv}")
        print(f"Offline CSV   : {offline_csv}")
        print(f"Summary       : {summary_txt}")

        log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now} stop_requested={STOP_REQUESTED}")

    except KeyboardInterrupt:
        print(f"\n{C_RED}Dihentikan paksa.{C_RESET}")
        print("Hasil domain yang sudah selesai dicek tetap aman tersimpan.")
        log_writer.write_line(f"RUN INTERRUPTED {now_utc()} processed_now={processed_now}")

    finally:
        all_writer.close()
        hidup_writer.close()
        live_ok_writer.close()
        warning_writer.close()
        offline_writer.close()

        hidup_list_writer.close()
        live_ok_list_writer.close()
        warning_list_writer.close()
        offline_list_writer.close()
        log_writer.close()


if __name__ == "__main__":
    main()