Siap. Saya lanjutkan ke versi yang paling aman dan paling praktis untuk kebutuhan Anda.
Target versi ini
Script ini saya desain supaya:
- CLI: input dari file, tidak edit script
- hasil langsung ditulis ke file per domain
- kalau Ctrl+C / putus / error, hasil yang sudah lewat tetap aman
- ada resume
- ada summary
- ada pisah hasil:
hidupwarningoffline
- bisa mendeteksi:
LIVE 200DEFAULT HOSTING PAGEPARKEDFOR SALECOMING SOONSUSPENDEDEXPIRED / RENEWAL ISSUESERVER ERRORFORBIDDENNOT FOUNDSSL invalidHTTP onlyredirect
- lebih aman dari overload karena:
- default single-thread / sequential
- timeout jelas
- body response dibatasi
- retry hanya untuk error koneksi ringan
- ada file:
all_results.csvhidup_results.csvlive_ok_results.csvwarning_results.csvoffline_results.csvhidup_domains.txtoffline_domains.txtwarning_domains.txtsummary.txtprogress.logcurrent_domain.txt
Install dependency
pip install requests dnspython beautifulsoup4
Script versi terbaik
Simpan sebagai:
domain_status_cli.py
Cara pakai
1. Siapkan file input
Misalnya domains.txt
trazam.com
deudex.com
4bx-cd.com
j3kram.com
edzzup.com
usaiptv.tv
2. Jalankan
python3 domain_status_cli.py -i domains.txt -o hasil
3. Kalau ingin lebih aman/stabil untuk list besar
Saya sarankan:
python3 domain_status_cli.py -i domains.txt -o hasil --connect-timeout 6 --read-timeout 8 --dns-timeout 3 --delay 0.2
4. Mulai ulang dari nol
python3 domain_status_cli.py -i domains.txt -o hasil --fresh
5. Resume otomatis
Kalau script pernah jalan, lalu berhenti, jalankan lagi:
python3 domain_status_cli.py -i domains.txt -o hasil
Dia akan baca all_results.csv dan skip yang sudah pernah diproses.
Arti file output
all_results.csv
Semua hasil.
hidup_results.csv
Semua domain yang masih reachable:
- normal
- warning
- suspended
- renewal issue
- 403
- 404
- 503
- parked
- default page
- dll
live_ok_results.csv
Hanya website yang benar-benar normal/live:
LIVE 200LIVE NON-HTML
warning_results.csv
Domain masih hidup/reachable, tapi ada masalah:
DEFAULT HOSTING PAGEPARKEDFOR SALESUSPENDEDEXPIRED / RENEWAL ISSUESERVER ERRORFORBIDDENNOT FOUND- dll
offline_results.csv
Yang benar-benar gagal:
- no DNS
- timeout
- connection failed
- request error
summary.txt
Ringkasan total, langsung diperbarui selama proses jalan.
current_domain.txt
Domain yang sedang diproses saat ini.
progress.log
Log proses.
Kenapa versi ini lebih baik dari sebelumnya
1. Aman kalau proses stop
Setelah 1 domain selesai:
- langsung tulis CSV
- langsung flush
- langsung fsync
Jadi kalau:
Ctrl+C- terminal putus
- server restart
- process crash
hasil sebelumnya tetap ada.
2. Lebih kecil risiko hang
Saya tambahkan:
- connect timeout
- read timeout
- DNS timeout
- batas body response
- retry ringan
- sequential
Jadi lebih aman daripada script yang baca response besar tanpa batas.
3. Bisa bedakan "hidup" dan "hidup tapi bermasalah"
Ini penting.
Contoh:
200belum tentu normal503belum tentu mati403belum tentu offlinedefault hosting pagebukan website normalsuspended/expiredjuga domain masih merespons
Catatan penting
Kalau proses dihentikan dengan:
kill -9 PID
maka tidak ada script Python yang bisa trap itu.
Tapi hasil domain yang sudah selesai ditulis sebelumnya tetap aman.
Rekomendasi command untuk pemakaian nyata
Kalau domain banyak:
python3 domain_status_cli.py -i domains.txt -o hasil --connect-timeout 6 --read-timeout 8 --dns-timeout 3 --delay 0.1
Kalau server kecil / takut overload:
python3 domain_status_cli.py -i domains.txt -o hasil --connect-timeout 5 --read-timeout 7 --dns-timeout 2 --delay 0.3
Kalau Anda mau lebih lanjut
Saya bisa bantu buat versi final yang lebih "premium" lagi, misalnya tambah:
- cek nameserver aktif / registrar parking pattern
- cek apakah redirect ke domain lain
- cek meta robots / canonical
- export XLSX
- multi-thread tapi tetap aman dengan writer terpisah
- progress bar
- filter input sangat besar
- mode scan khusus expired/suspended
Kalau mau, langkah berikutnya saya sarankan:
saya buatkan versi v2 dengan kolom yang lebih mirip HeadMasterSEO + summary statistik per kategori yang lebih detail.
import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import time
import warnings
from collections import Counter
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
import dns.resolver
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.retry import Retry
warnings.simplefilter("ignore", InsecureRequestWarning)
# =========================
# CONFIG / CONSTANTS
# =========================
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,text/plain;q=0.8,*/*;q=0.7",
}
DEFAULT_CONNECT_TIMEOUT = 8
DEFAULT_READ_TIMEOUT = 10
DEFAULT_DNS_TIMEOUT = 3
DEFAULT_MAX_REDIRECTS = 10
DEFAULT_MAX_BYTES = 262144 # 256 KB
DEFAULT_RETRIES = 1
FOR_SALE_KEYWORDS = [
"domain is for sale",
"buy this domain",
"this domain may be for sale",
"purchase this domain",
"afternic",
"sedo",
"dan.com",
"undeveloped",
]
PARKED_KEYWORDS = [
"domain parked",
"parked free",
"parkingcrew",
"bodis",
"cashparking",
"sedo parking",
"parked domain",
"this domain is parked",
]
DEFAULT_HOSTING_KEYWORDS = [
"apache2 ubuntu default page",
"apache2 debian default page",
"welcome to nginx",
"nginx test page",
"test page for the nginx",
"default web site page",
"iis windows server",
]
COMING_SOON_KEYWORDS = [
"coming soon",
"under construction",
"launching soon",
"website coming soon",
"site is coming soon",
]
SUSPENDED_KEYWORDS = [
"this account has been suspended",
"account suspended",
"website suspended",
"site suspended",
"hosting account has been suspended",
"suspended due to non-payment",
"please contact billing",
"contact your hosting provider",
"billing issue",
]
EXPIRED_KEYWORDS = [
"this domain has expired",
"domain expired",
"expired domain",
"renew this domain",
"renewal required",
"domain renewal",
"renew now",
"expiration notice",
"registrant verification failed",
"has expired and may be available",
]
FIELDNAMES = [
"checked_at",
"domain",
"bucket",
"page_type",
"dns_ok",
"dns_error",
"status_code",
"ssl_status",
"content_type",
"elapsed_ms",
"best_start_url",
"best_final_url",
"title",
"notes",
"error",
"redirect_chain",
"A",
"AAAA",
"CNAME",
"MX",
"NS",
"all_attempts",
]
STOP_REQUESTED = False
SIGNAL_COUNT = 0
# ANSI color
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
C_DIM = "\033[2m"
# =========================
# SAFE FILE WRITERS
# =========================
def sync_file(f):
f.flush()
os.fsync(f.fileno())
def atomic_write_text(path, text):
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(text)
sync_file(f)
os.replace(tmp, path)
finally:
try:
if os.path.exists(tmp):
os.remove(tmp)
except Exception:
pass
class SafeCsvWriter:
def __init__(self, path, fieldnames):
self.path = path
os.makedirs(os.path.dirname(path), exist_ok=True)
file_exists = os.path.exists(path) and os.path.getsize(path) > 0
self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
if not file_exists:
self.writer.writeheader()
sync_file(self.f)
def writerow(self, row):
self.writer.writerow(row)
sync_file(self.f)
def close(self):
try:
self.f.close()
except Exception:
pass
class SafeLineWriter:
def __init__(self, path):
self.path = path
os.makedirs(os.path.dirname(path), exist_ok=True)
self.f = open(path, "a", encoding="utf-8", buffering=1)
def write_line(self, text):
self.f.write(text.rstrip("\n") + "\n")
sync_file(self.f)
def close(self):
try:
self.f.close()
except Exception:
pass
# =========================
# UTIL
# =========================
def now_utc():
return datetime.now(timezone.utc).isoformat()
def remove_if_exists(path):
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
def write_current_domain(path, domain):
atomic_write_text(path, domain.strip() + "\n")
def clear_current_domain(path):
atomic_write_text(path, "")
def clean_text(s):
if not s:
return ""
return re.sub(r"\s+", " ", s).strip()
def normalize_domain(raw):
s = raw.strip()
if not s or s.startswith("#"):
return ""
s = s.split("#", 1)[0].strip()
if not s:
return ""
if "://" not in s:
s = "http://" + s
try:
p = urlparse(s)
host = p.netloc or p.path
host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
if host.startswith("www."):
host = host[4:]
return host
except Exception:
return ""
def load_domains(input_file):
domains = []
seen = set()
with open(input_file, "r", encoding="utf-8") as f:
for line in f:
d = normalize_domain(line)
if d and d not in seen:
seen.add(d)
domains.append(d)
return domains
def load_processed_domains(all_results_csv):
processed = set()
if not os.path.exists(all_results_csv) or os.path.getsize(all_results_csv) == 0:
return processed
try:
with open(all_results_csv, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
domain = (row.get("domain") or "").strip().lower()
if domain:
processed.add(domain)
except Exception:
pass
return processed
def host_of(url):
try:
return (urlparse(url).hostname or "").lower()
except Exception:
return ""
def is_html_like(content_type):
ct = (content_type or "").lower()
return any(x in ct for x in [
"text/html",
"application/xhtml+xml",
"text/plain",
"application/xml",
"text/xml",
])
def extract_title(html):
if not html:
return ""
try:
soup = BeautifulSoup(html, "html.parser")
if soup.title and soup.title.string:
return clean_text(soup.title.string)
except Exception:
pass
m = re.search(r"<title[^>]*>(.*?)</title>", html, re.I | re.S)
if m:
return clean_text(m.group(1))
return ""
def colorize_bucket(bucket, text):
if bucket == "HIDUP":
return f"{C_GREEN}{text}{C_RESET}"
if bucket == "WARNING":
return f"{C_YELLOW}{text}{C_RESET}"
return f"{C_RED}{text}{C_RESET}"
# =========================
# SIGNAL HANDLING
# =========================
def signal_handler(signum, frame):
global STOP_REQUESTED, SIGNAL_COUNT
SIGNAL_COUNT += 1
if SIGNAL_COUNT == 1:
STOP_REQUESTED = True
print(
f"\n{C_YELLOW}Signal diterima. Script akan berhenti setelah domain saat ini selesai.{C_RESET}\n"
f"{C_DIM}Tekan Ctrl+C sekali lagi untuk paksa berhenti sekarang.{C_RESET}"
)
else:
raise KeyboardInterrupt
# =========================
# DNS
# =========================
def get_dns_info(domain, dns_timeout):
result = {
"dns_ok": False,
"A": [],
"AAAA": [],
"CNAME": [],
"MX": [],
"NS": [],
"dns_error": "",
}
resolver = dns.resolver.Resolver()
resolver.timeout = dns_timeout
resolver.lifetime = dns_timeout
for rtype in ["A", "AAAA", "CNAME", "MX", "NS"]:
try:
answers = resolver.resolve(domain, rtype)
values = []
for r in answers:
if rtype == "MX":
values.append(str(r.exchange).rstrip("."))
elif hasattr(r, "target"):
values.append(str(r.target).rstrip("."))
else:
values.append(str(r).rstrip("."))
result[rtype] = values
except dns.resolver.NXDOMAIN:
result["dns_error"] = "NXDOMAIN"
return result
except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
pass
except Exception as e:
if not result["dns_error"]:
result["dns_error"] = type(e).__name__
if any(result[k] for k in ["A", "AAAA", "CNAME", "MX", "NS"]):
result["dns_ok"] = True
elif not result["dns_error"]:
result["dns_error"] = "NO_RECORDS"
return result
# =========================
# HTTP / PROBE
# =========================
def build_session(retries):
session = requests.Session()
retry_cfg = Retry(
total=retries,
connect=retries,
read=0,
redirect=0,
status=0,
backoff_factor=0.3,
allowed_methods=frozenset(["GET"]),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry_cfg, pool_connections=10, pool_maxsize=10)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def read_limited_text(resp, max_bytes):
chunks = []
total = 0
try:
for chunk in resp.iter_content(chunk_size=8192, decode_unicode=False):
if not chunk:
continue
remain = max_bytes - total
if remain <= 0:
break
if len(chunk) > remain:
chunk = chunk[:remain]
chunks.append(chunk)
total += len(chunk)
if total >= max_bytes:
break
except Exception:
pass
raw = b"".join(chunks)
enc = resp.encoding or "utf-8"
try:
return raw.decode(enc, errors="replace")
except Exception:
return raw.decode("utf-8", errors="replace")
def classify_page(status_code, title, body, content_type):
blob = ((title or "") + "\n" + (body or "")[:8000]).lower()
if any(k in blob for k in EXPIRED_KEYWORDS):
return "EXPIRED / RENEWAL ISSUE"
if any(k in blob for k in SUSPENDED_KEYWORDS):
return "SUSPENDED"
if any(k in blob for k in FOR_SALE_KEYWORDS):
return "FOR SALE"
if any(k in blob for k in PARKED_KEYWORDS):
return "PARKED"
if any(k in blob for k in DEFAULT_HOSTING_KEYWORDS):
return "DEFAULT HOSTING PAGE"
if any(k in blob for k in COMING_SOON_KEYWORDS):
return "COMING SOON"
if 200 <= status_code <= 299:
if content_type and not is_html_like(content_type):
return "LIVE NON-HTML"
return "LIVE 200"
if status_code in (301, 302, 303, 307, 308):
return "REDIRECT"
if status_code == 401:
return "UNAUTHORIZED"
if status_code == 403:
return "FORBIDDEN"
if status_code == 404:
return "NOT FOUND"
if status_code == 410:
return "GONE"
if status_code == 429:
return "RATE LIMITED"
if 500 <= status_code <= 599:
return "SERVER ERROR"
return f"HTTP {status_code}"
def score_result(r):
if not r["ok"]:
if r["page_type"] == "TOO MANY REDIRECTS" and r["chain"]:
return 70
return 0
pt = r["page_type"]
sc = r["status_code"]
if pt == "LIVE 200":
return 100
if pt == "LIVE NON-HTML":
return 98
if pt in [
"EXPIRED / RENEWAL ISSUE",
"SUSPENDED",
"FOR SALE",
"PARKED",
"DEFAULT HOSTING PAGE",
"COMING SOON",
]:
return 95
if sc == 403:
return 85
if sc == 401:
return 84
if 500 <= sc <= 599:
return 82
if sc in (404, 410):
return 80
if 300 <= sc < 400:
return 75
return 10
def probe_url(session, start_url, connect_timeout, read_timeout, max_redirects, max_bytes):
current_url = start_url
chain = []
ssl_status = "N/A"
for _ in range(max_redirects):
resp = None
try:
resp = session.get(
current_url,
headers=HEADERS,
timeout=(connect_timeout, read_timeout),
allow_redirects=False,
verify=True,
stream=True,
)
if current_url.startswith("https://"):
ssl_status = "VALID"
except requests.exceptions.SSLError:
ssl_status = "INVALID"
try:
resp = session.get(
current_url,
headers=HEADERS,
timeout=(connect_timeout, read_timeout),
allow_redirects=False,
verify=False,
stream=True,
)
except requests.exceptions.Timeout:
return {
"ok": False,
"start_url": start_url,
"final_url": current_url,
"status_code": "",
"page_type": "TIMEOUT",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": "Timeout",
}
except requests.exceptions.ConnectionError:
return {
"ok": False,
"start_url": start_url,
"final_url": current_url,
"status_code": "",
"page_type": "CONNECTION FAILED",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": "ConnectionError",
}
except Exception as e:
return {
"ok": False,
"start_url": start_url,
"final_url": current_url,
"status_code": "",
"page_type": "SSL ERROR",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": type(e).__name__,
}
except requests.exceptions.Timeout:
return {
"ok": False,
"start_url": start_url,
"final_url": current_url,
"status_code": "",
"page_type": "TIMEOUT",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": "Timeout",
}
except requests.exceptions.ConnectionError:
return {
"ok": False,
"start_url": start_url,
"final_url": current_url,
"status_code": "",
"page_type": "CONNECTION FAILED",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": "ConnectionError",
}
except Exception as e:
return {
"ok": False,
"start_url": start_url,
"final_url": current_url,
"status_code": "",
"page_type": "REQUEST ERROR",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": type(e).__name__,
}
try:
elapsed_ms = int(resp.elapsed.total_seconds() * 1000)
except Exception:
elapsed_ms = ""
chain.append(f"{resp.status_code} {current_url}")
if 300 <= resp.status_code < 400 and resp.headers.get("Location"):
next_url = urljoin(current_url, resp.headers.get("Location"))
try:
resp.close()
except Exception:
pass
current_url = next_url
continue
content_type = resp.headers.get("Content-Type", "")
body = ""
if resp.status_code not in (204, 304):
body = read_limited_text(resp, max_bytes)
title = extract_title(body)
page_type = classify_page(resp.status_code, title, body, content_type)
final_url = resp.url
try:
resp.close()
except Exception:
pass
return {
"ok": True,
"start_url": start_url,
"final_url": final_url,
"status_code": resp.status_code,
"page_type": page_type,
"title": title,
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": content_type,
"elapsed_ms": elapsed_ms,
"error": "",
}
return {
"ok": False,
"start_url": start_url,
"final_url": current_url,
"status_code": "",
"page_type": "TOO MANY REDIRECTS",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": "TooManyRedirects",
}
def best_probe(domain, connect_timeout, read_timeout, max_redirects, max_bytes, retries):
candidates = [
f"https://{domain}",
f"https://www.{domain}",
f"http://{domain}",
f"http://www.{domain}",
]
results = []
with build_session(retries) as session:
for url in candidates:
r = probe_url(
session=session,
start_url=url,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
max_redirects=max_redirects,
max_bytes=max_bytes,
)
results.append(r)
best = max(results, key=score_result)
return results, best
# =========================
# CLASSIFICATION / NOTES
# =========================
def classify_bucket(best):
pt = best["page_type"]
sc = best["status_code"]
if pt in ["LIVE 200", "LIVE NON-HTML"]:
return "HIDUP"
if sc != "":
return "WARNING"
if pt == "TOO MANY REDIRECTS" and best["chain"]:
return "WARNING"
return "OFFLINE"
def build_notes(domain, dns_info, best):
notes = []
if not dns_info["dns_ok"]:
notes.append("DNS problem")
if best["ssl_status"] == "INVALID":
notes.append("SSL invalid")
final_url = best["final_url"] or ""
start_url = best["start_url"] or ""
final_host = host_of(final_url)
if start_url and final_url and start_url != final_url:
notes.append("Redirected")
if final_url.startswith("http://"):
notes.append("HTTP only")
if final_host and final_host not in {domain, f"www.{domain}"}:
notes.append(f"Redirect to other host: {final_host}")
if best["page_type"] == "DEFAULT HOSTING PAGE":
notes.append("Server default page")
elif best["page_type"] == "PARKED":
notes.append("Parked domain")
elif best["page_type"] == "FOR SALE":
notes.append("Domain for sale")
elif best["page_type"] == "COMING SOON":
notes.append("Coming soon page")
elif best["page_type"] == "SUSPENDED":
notes.append("Suspended; check hosting/billing")
elif best["page_type"] == "EXPIRED / RENEWAL ISSUE":
notes.append("Expired/renewal issue")
elif best["page_type"] == "SERVER ERROR":
notes.append("Website reachable but server error")
elif best["page_type"] == "NOT FOUND":
notes.append("Host reachable but page not found")
elif best["page_type"] == "FORBIDDEN":
notes.append("Host reachable but blocked/forbidden")
elif best["page_type"] == "CONNECTION FAILED":
notes.append("Cannot connect to web server")
elif best["page_type"] == "TIMEOUT":
notes.append("Request timeout")
ct = (best["content_type"] or "").strip()
if ct and not is_html_like(ct):
notes.append(f"Non-HTML content: {ct}")
return "; ".join(notes)
def summarize_domain(domain, connect_timeout, read_timeout, dns_timeout, max_redirects, max_bytes, retries):
dns_info = get_dns_info(domain, dns_timeout)
attempts, best = best_probe(
domain=domain,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
max_redirects=max_redirects,
max_bytes=max_bytes,
retries=retries,
)
bucket = classify_bucket(best)
notes = build_notes(domain, dns_info, best)
row = {
"checked_at": now_utc(),
"domain": domain,
"bucket": bucket,
"page_type": best["page_type"],
"dns_ok": dns_info["dns_ok"],
"dns_error": dns_info["dns_error"],
"status_code": best["status_code"],
"ssl_status": best["ssl_status"],
"content_type": best["content_type"],
"elapsed_ms": best["elapsed_ms"],
"best_start_url": best["start_url"],
"best_final_url": best["final_url"],
"title": best["title"],
"notes": notes,
"error": best["error"],
"redirect_chain": best["chain"],
"A": ", ".join(dns_info["A"]),
"AAAA": ", ".join(dns_info["AAAA"]),
"CNAME": ", ".join(dns_info["CNAME"]),
"MX": ", ".join(dns_info["MX"]),
"NS": ", ".join(dns_info["NS"]),
"all_attempts": " || ".join(
f'{r["start_url"]} => {r["page_type"]} ({r["status_code"]}) -> {r["final_url"]}'
for r in attempts
),
}
return row
def fallback_error_row(domain, err_msg):
return {
"checked_at": now_utc(),
"domain": domain,
"bucket": "OFFLINE",
"page_type": "SCRIPT ERROR",
"dns_ok": "",
"dns_error": "",
"status_code": "",
"ssl_status": "",
"content_type": "",
"elapsed_ms": "",
"best_start_url": "",
"best_final_url": "",
"title": "",
"notes": "Internal script error",
"error": err_msg,
"redirect_chain": "",
"A": "",
"AAAA": "",
"CNAME": "",
"MX": "",
"NS": "",
"all_attempts": "",
}
# =========================
# SUMMARY
# =========================
def update_summary(path, total_input, processed_now, skipped_resume, counts_bucket, counts_type, current_domain):
lines = []
lines.append("DOMAIN STATUS CHECKER SUMMARY")
lines.append("=" * 40)
lines.append(f"generated_at : {now_utc()}")
lines.append(f"total_input : {total_input}")
lines.append(f"processed_now : {processed_now}")
lines.append(f"skipped_resume : {skipped_resume}")
lines.append(f"remaining_est : {max(total_input - processed_now - skipped_resume, 0)}")
lines.append(f"current_domain : {current_domain or '-'}")
lines.append("")
lines.append("BUCKET COUNTS")
lines.append("-" * 40)
for key in ["HIDUP", "WARNING", "OFFLINE"]:
lines.append(f"{key:15}: {counts_bucket.get(key, 0)}")
lines.append("")
lines.append("PAGE TYPE COUNTS")
lines.append("-" * 40)
for k, v in counts_type.most_common():
lines.append(f"{k:30}: {v}")
lines.append("")
lines.append("KETERANGAN")
lines.append("-" * 40)
lines.append("HIDUP = website normal/live")
lines.append("WARNING = domain reachable tapi bermasalah / bukan website normal")
lines.append("OFFLINE = tidak reachable / DNS gagal / timeout / connection failed")
atomic_write_text(path, "\n".join(lines) + "\n")
# =========================
# CLI
# =========================
def parse_args():
parser = argparse.ArgumentParser(
description="CLI checker status domain yang lebih mirip HeadMasterSEO-style"
)
parser.add_argument("-i", "--input", required=True, help="File input domain (.txt)")
parser.add_argument("-o", "--output", default="results", help="Folder output")
parser.add_argument("--connect-timeout", type=int, default=DEFAULT_CONNECT_TIMEOUT, help="Connect timeout (detik)")
parser.add_argument("--read-timeout", type=int, default=DEFAULT_READ_TIMEOUT, help="Read timeout (detik)")
parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout (detik)")
parser.add_argument("--max-redirects", type=int, default=DEFAULT_MAX_REDIRECTS, help="Batas redirect")
parser.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES, help="Maksimum body yang dibaca")
parser.add_argument("--retries", type=int, default=DEFAULT_RETRIES, help="Retry koneksi ringan")
parser.add_argument("--delay", type=float, default=0.0, help="Delay antar domain (detik)")
parser.add_argument("--fresh", action="store_true", help="Hapus output lama dan mulai dari awal")
parser.add_argument("--no-resume", action="store_true", help="Jangan skip domain yang sudah diproses")
return parser.parse_args()
def main():
global STOP_REQUESTED
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
args = parse_args()
os.makedirs(args.output, exist_ok=True)
all_csv = os.path.join(args.output, "all_results.csv")
hidup_csv = os.path.join(args.output, "hidup_results.csv") # HIDUP + WARNING (semua yang reachable)
live_ok_csv = os.path.join(args.output, "live_ok_results.csv") # HIDUP only
warning_csv = os.path.join(args.output, "warning_results.csv") # WARNING only
offline_csv = os.path.join(args.output, "offline_results.csv") # OFFLINE only
hidup_txt = os.path.join(args.output, "hidup_domains.txt")
live_ok_txt = os.path.join(args.output, "live_ok_domains.txt")
warning_txt = os.path.join(args.output, "warning_domains.txt")
offline_txt = os.path.join(args.output, "offline_domains.txt")
summary_txt = os.path.join(args.output, "summary.txt")
progress_log = os.path.join(args.output, "progress.log")
current_file = os.path.join(args.output, "current_domain.txt")
if args.fresh:
for p in [
all_csv, hidup_csv, live_ok_csv, warning_csv, offline_csv,
hidup_txt, live_ok_txt, warning_txt, offline_txt,
summary_txt, progress_log, current_file
]:
remove_if_exists(p)
domains = load_domains(args.input)
if not domains:
print("Tidak ada domain valid di file input.")
sys.exit(1)
processed = set()
if not args.no_resume:
processed = load_processed_domains(all_csv)
queue = [d for d in domains if d not in processed]
all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
hidup_writer = SafeCsvWriter(hidup_csv, FIELDNAMES)
live_ok_writer = SafeCsvWriter(live_ok_csv, FIELDNAMES)
warning_writer = SafeCsvWriter(warning_csv, FIELDNAMES)
offline_writer = SafeCsvWriter(offline_csv, FIELDNAMES)
hidup_list_writer = SafeLineWriter(hidup_txt)
live_ok_list_writer = SafeLineWriter(live_ok_txt)
warning_list_writer = SafeLineWriter(warning_txt)
offline_list_writer = SafeLineWriter(offline_txt)
log_writer = SafeLineWriter(progress_log)
counts_bucket = Counter()
counts_type = Counter()
processed_now = 0
print(f"{C_CYAN}Total input : {len(domains)}{C_RESET}")
print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
print(f"{C_CYAN}Akan diproses : {len(queue)}{C_RESET}")
print(f"{C_CYAN}Output folder : {args.output}{C_RESET}")
print("-" * 110)
log_writer.write_line(
f"RUN START {now_utc()} total_input={len(domains)} skipped_resume={len(processed)} pending={len(queue)}"
)
update_summary(
path=summary_txt,
total_input=len(domains),
processed_now=processed_now,
skipped_resume=len(processed),
counts_bucket=counts_bucket,
counts_type=counts_type,
current_domain="",
)
try:
for idx, domain in enumerate(queue, 1):
if STOP_REQUESTED:
break
write_current_domain(current_file, domain)
log_writer.write_line(f"START {now_utc()} {domain}")
try:
row = summarize_domain(
domain=domain,
connect_timeout=args.connect_timeout,
read_timeout=args.read_timeout,
dns_timeout=args.dns_timeout,
max_redirects=args.max_redirects,
max_bytes=args.max_bytes,
retries=args.retries,
)
except KeyboardInterrupt:
raise
except Exception as e:
row = fallback_error_row(domain, f"{type(e).__name__}: {e}")
# Always write all first
all_writer.writerow(row)
# split
if row["bucket"] in ("HIDUP", "WARNING"):
hidup_writer.writerow(row)
hidup_list_writer.write_line(domain)
if row["bucket"] == "HIDUP":
live_ok_writer.writerow(row)
live_ok_list_writer.write_line(domain)
elif row["bucket"] == "WARNING":
warning_writer.writerow(row)
warning_list_writer.write_line(domain)
else:
offline_writer.writerow(row)
offline_list_writer.write_line(domain)
counts_bucket[row["bucket"]] += 1
counts_type[row["page_type"]] += 1
processed_now += 1
update_summary(
path=summary_txt,
total_input=len(domains),
processed_now=processed_now,
skipped_resume=len(processed),
counts_bucket=counts_bucket,
counts_type=counts_type,
current_domain=domain,
)
bucket_colored = colorize_bucket(row["bucket"], row["bucket"])
code_text = str(row["status_code"]) if row["status_code"] != "" else "-"
print(
f"[{idx}/{len(queue)}] "
f"{domain:30} -> {bucket_colored:20} | "
f"{row['page_type'][:28]:28} | "
f"{code_text:4} | "
f"{str(row['elapsed_ms'])[:6]:6} ms | "
f"{row['best_final_url'][:45]}"
)
log_writer.write_line(
f"DONE {now_utc()} {domain} "
f"bucket={row['bucket']} type={row['page_type']} code={row['status_code']}"
)
clear_current_domain(current_file)
if args.delay > 0 and not STOP_REQUESTED:
time.sleep(args.delay)
update_summary(
path=summary_txt,
total_input=len(domains),
processed_now=processed_now,
skipped_resume=len(processed),
counts_bucket=counts_bucket,
counts_type=counts_type,
current_domain="",
)
print("-" * 110)
if STOP_REQUESTED:
print(f"{C_YELLOW}Berhenti secara graceful.{C_RESET}")
else:
print(f"{C_GREEN}Selesai.{C_RESET}")
print(f"All CSV : {all_csv}")
print(f"Hidup CSV : {hidup_csv}")
print(f"Live OK CSV : {live_ok_csv}")
print(f"Warning CSV : {warning_csv}")
print(f"Offline CSV : {offline_csv}")
print(f"Summary : {summary_txt}")
log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now} stop_requested={STOP_REQUESTED}")
except KeyboardInterrupt:
print(f"\n{C_RED}Dihentikan paksa.{C_RESET}")
print("Hasil domain yang sudah selesai dicek tetap aman tersimpan.")
log_writer.write_line(f"RUN INTERRUPTED {now_utc()} processed_now={processed_now}")
finally:
all_writer.close()
hidup_writer.close()
live_ok_writer.close()
warning_writer.close()
offline_writer.close()
hidup_list_writer.close()
live_ok_list_writer.close()
warning_list_writer.close()
offline_list_writer.close()
log_writer.close()
if __name__ == "__main__":
main()