Yang Anda butuhkan sebenarnya ada 2 layer
-
Layer DNS cepat
Cari domain yang:dns_ok = Falsedns_error = NO_RECORDS
-
Layer data registrasi
Untuk domain yang menarik, ambil:- nameserver
- tanggal register / creation
- tanggal expire
- umur domain
- sisa hari ke expire
- kalau bisa status seperti
clientHold,serverHold, dll
Jawaban pertanyaan Anda:
“kenapa tidak semua ke grep NS nya?”
Karena NS di DNS dan NS di whois.com itu tidak selalu sama sumbernya.
1. Script lama baca DNS NS
Biasanya dengan query seperti:
NS domain.com
Ini hanya melihat delegasi DNS yang aktif / bisa dijawab resolver.
2. whois.com sering menampilkan WHOIS / RDAP nameservers
Ini berasal dari data registry/registrar, bukan selalu dari jawaban DNS live.
Jadi bisa terjadi kondisi seperti ini:
- di WHOIS/RDAP nameserver ada
- tapi di DNS query tidak muncul / gagal / kosong
Kenapa bisa begitu?
Beberapa penyebab umum:
- domain clientHold / serverHold
- domain expired / suspend
- nameserver tercatat di registry, tapi zone belum/delegasi tidak aktif
- DNS resolver publik tidak bisa menjawab dengan benar
- TLD/registry punya perilaku berbeda
- domain ada di WHOIS, tapi record DNS web (
A/AAAA/CNAME) kosong
Jadi kesimpulannya
Kalau Anda ingin hasil yang mendekati whois.com, maka jangan hanya ambil NS dari DNS.
Anda perlu ambil 2 sumber NS:
dns_ns→ nameserver dari DNS liverdap_ns→ nameserver dari registry/WHOIS/RDAP
Ini jauh lebih berguna untuk hunting domain.
Solusi terbaik untuk kebutuhan Anda
Saya sarankan script khusus:
Fokus utama:
- cari domain
dns_error=NO_RECORDS - tetap cepat
- hanya jika match/no-records, baru ambil RDAP
- ambil:
dns_nsrdap_nscreated_atexpires_atage_daysdays_to_expirerdap_status
Jadi:
- tetap cepat
- lebih kaya data
- nameserver lebih akurat
- cocok untuk shortlist domain beli/ditawarkan
Install
pip install requests dnspython
Script baru
Simpan sebagai:
find_norecords_enriched.py
import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import threading
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urlparse
import dns.resolver
import requests
STOP_EVENT = threading.Event()
DEFAULT_DNS_TIMEOUT = 2
DEFAULT_RDAP_TIMEOUT = 8
DEFAULT_WORKERS = 60
FIELDNAMES = [
"checked_at",
"domain",
"match",
"bucket",
"page_type",
"dns_ok",
"dns_error",
"A",
"AAAA",
"CNAME",
"dns_ns",
"rdap_ns",
"rdap_status",
"created_at",
"expires_at",
"age_days",
"days_to_expire",
"notes",
"rdap_error",
]
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
def now_utc():
return datetime.now(timezone.utc).isoformat()
def parse_dt(value):
if not value:
return None
v = str(value).strip()
if not v:
return None
v = v.replace("Z", "+00:00")
try:
return datetime.fromisoformat(v)
except Exception:
pass
fmts = [
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d %H:%M:%S%z",
"%Y-%m-%d",
]
for fmt in fmts:
try:
dt = datetime.strptime(v, fmt)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
continue
return None
def dt_to_str(dt):
if not dt:
return ""
try:
return dt.astimezone(timezone.utc).isoformat()
except Exception:
return str(dt)
def days_between_utc(start_dt, end_dt):
if not start_dt or not end_dt:
return ""
try:
a = start_dt.astimezone(timezone.utc)
b = end_dt.astimezone(timezone.utc)
return (b - a).days
except Exception:
return ""
def normalize_domain(raw):
s = raw.strip()
if not s or s.startswith("#"):
return ""
s = s.split("#", 1)[0].strip()
if not s:
return ""
if "://" not in s:
s = "http://" + s
try:
p = urlparse(s)
host = p.netloc or p.path
host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
if host.startswith("www."):
host = host[4:]
return host
except Exception:
return ""
def load_domains(input_file):
domains = []
seen = set()
with open(input_file, "r", encoding="utf-8") as f:
for line in f:
d = normalize_domain(line)
if d and d not in seen:
seen.add(d)
domains.append(d)
return domains
def load_processed_domains(csv_path):
processed = set()
if not os.path.exists(csv_path) or os.path.getsize(csv_path) == 0:
return processed
try:
with open(csv_path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
d = (row.get("domain") or "").strip().lower()
if d:
processed.add(d)
except Exception:
pass
return processed
def remove_if_exists(path):
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
def atomic_write_text(path, text):
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(text)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
finally:
try:
if os.path.exists(tmp):
os.remove(tmp)
except Exception:
pass
class SafeCsvWriter:
def __init__(self, path, fieldnames):
self.path = path
os.makedirs(os.path.dirname(path), exist_ok=True)
file_exists = os.path.exists(path) and os.path.getsize(path) > 0
self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
if not file_exists:
self.writer.writeheader()
self.f.flush()
os.fsync(self.f.fileno())
def writerow(self, row):
self.writer.writerow(row)
self.f.flush()
os.fsync(self.f.fileno())
def close(self):
try:
self.f.close()
except Exception:
pass
class SafeLineWriter:
def __init__(self, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
self.f = open(path, "a", encoding="utf-8", buffering=1)
def write_line(self, text):
self.f.write(text.rstrip("\n") + "\n")
self.f.flush()
os.fsync(self.f.fileno())
def close(self):
try:
self.f.close()
except Exception:
pass
def signal_handler(signum, frame):
if not STOP_EVENT.is_set():
STOP_EVENT.set()
print(f"\n{C_YELLOW}Signal diterima. Stop submit job baru...{C_RESET}")
else:
raise KeyboardInterrupt
def get_dns_info(domain, dns_timeout=2):
result = {
"dns_ok": False,
"dns_error": "",
"A": [],
"AAAA": [],
"CNAME": [],
"NS": [],
}
resolver = dns.resolver.Resolver()
resolver.timeout = dns_timeout
resolver.lifetime = dns_timeout
# cek record web
for rtype in ["A", "AAAA", "CNAME"]:
try:
answers = resolver.resolve(domain, rtype)
vals = []
for r in answers:
if hasattr(r, "target"):
vals.append(str(r.target).rstrip("."))
else:
vals.append(str(r).rstrip("."))
result[rtype] = vals
except dns.resolver.NXDOMAIN:
result["dns_error"] = "NXDOMAIN"
# kalau NXDOMAIN tetap coba NS tidak perlu
return result
except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
pass
except Exception as e:
if not result["dns_error"]:
result["dns_error"] = type(e).__name__
if any(result[k] for k in ["A", "AAAA", "CNAME"]):
result["dns_ok"] = True
elif not result["dns_error"]:
result["dns_error"] = "NO_RECORDS"
# selalu coba ambil DNS NS juga
try:
answers = resolver.resolve(domain, "NS")
result["NS"] = [str(x).rstrip(".") for x in answers]
except Exception:
pass
return result
def extract_event_date(events, wanted_actions):
if not isinstance(events, list):
return None
for action in wanted_actions:
for ev in events:
if not isinstance(ev, dict):
continue
act = str(ev.get("eventAction", "")).strip().lower()
if act == action.lower():
dt = parse_dt(ev.get("eventDate"))
if dt:
return dt
return None
def rdap_lookup(domain, timeout=8):
url = f"https://rdap.org/domain/{domain}"
out = {
"rdap_ns": [],
"rdap_status": [],
"created_at": None,
"expires_at": None,
"rdap_error": "",
}
try:
r = requests.get(
url,
timeout=timeout,
headers={"User-Agent": "Mozilla/5.0", "Accept": "application/json"},
)
if r.status_code != 200:
out["rdap_error"] = f"HTTP_{r.status_code}"
return out
data = r.json()
# status
status = data.get("status", [])
if isinstance(status, list):
out["rdap_status"] = [str(x) for x in status if str(x).strip()]
# nameservers
nameservers = data.get("nameservers", [])
ns_list = []
if isinstance(nameservers, list):
for ns in nameservers:
if isinstance(ns, dict):
name = ns.get("ldhName") or ns.get("unicodeName")
if name:
ns_list.append(str(name).rstrip("."))
elif isinstance(ns, str):
ns_list.append(ns.rstrip("."))
out["rdap_ns"] = sorted(set(ns_list))
# dates
events = data.get("events", [])
out["created_at"] = extract_event_date(events, [
"registration",
"registered",
"created",
])
out["expires_at"] = extract_event_date(events, [
"expiration",
"expired",
"expiry",
"expires",
"renewal",
])
return out
except requests.exceptions.Timeout:
out["rdap_error"] = "Timeout"
return out
except Exception as e:
out["rdap_error"] = type(e).__name__
return out
def classify_row(domain, dns_info, rdap_info):
now = datetime.now(timezone.utc)
created_at = rdap_info.get("created_at")
expires_at = rdap_info.get("expires_at")
age_days = days_between_utc(created_at, now)
days_to_expire = days_between_utc(now, expires_at)
dns_ns = dns_info.get("NS", [])
rdap_ns = rdap_info.get("rdap_ns", [])
rdap_status = rdap_info.get("rdap_status", [])
notes = []
if dns_info["dns_ok"] is False and dns_info["dns_error"] == "NO_RECORDS":
match = "YES"
bucket = "OFFLINE"
page_type = "CONNECTION FAILED"
if dns_ns:
notes.append("DNS NS ada")
if rdap_ns:
notes.append("RDAP/WHOIS NS ada")
if rdap_ns and not dns_ns:
notes.append("NS terlihat di RDAP, tapi DNS NS tidak terbaca")
if not dns_ns and not rdap_ns:
notes.append("Tidak terlihat NS dari DNS maupun RDAP")
elif dns_info["dns_error"] == "NXDOMAIN":
match = "NO"
bucket = "OFFLINE"
page_type = "NXDOMAIN"
notes.append("Domain tidak resolve / kemungkinan tidak terdaftar")
elif dns_info["dns_ok"]:
match = "NO"
bucket = "HAS_DNS"
page_type = "HAS_DNS_RECORD"
notes.append("Ada A/AAAA/CNAME")
else:
match = "NO"
bucket = "OTHER"
page_type = "DNS_NOT_MATCH"
notes.append("Bukan exact NO_RECORDS")
if rdap_status:
low = " | ".join([x.lower() for x in rdap_status])
if "hold" in low:
notes.append("Ada status hold")
if "redemption" in low:
notes.append("Redemption period")
if "pending delete" in low:
notes.append("Pending delete")
if expires_at and isinstance(days_to_expire, int):
if days_to_expire < 0:
notes.append("Sudah expired")
elif days_to_expire <= 30:
notes.append("Mendekati expire")
return {
"checked_at": now_utc(),
"domain": domain,
"match": match,
"bucket": bucket,
"page_type": page_type,
"dns_ok": dns_info["dns_ok"],
"dns_error": dns_info["dns_error"],
"A": ", ".join(dns_info["A"]),
"AAAA": ", ".join(dns_info["AAAA"]),
"CNAME": ", ".join(dns_info["CNAME"]),
"dns_ns": ", ".join(dns_ns),
"rdap_ns": ", ".join(rdap_ns),
"rdap_status": ", ".join(rdap_status),
"created_at": dt_to_str(created_at),
"expires_at": dt_to_str(expires_at),
"age_days": age_days,
"days_to_expire": days_to_expire,
"notes": "; ".join(notes),
"rdap_error": rdap_info.get("rdap_error", ""),
}
def process_domain(domain, dns_timeout, rdap_timeout, skip_rdap):
dns_info = get_dns_info(domain, dns_timeout=dns_timeout)
rdap_info = {
"rdap_ns": [],
"rdap_status": [],
"created_at": None,
"expires_at": None,
"rdap_error": "",
}
# hanya enrich RDAP kalau:
# - domain NO_RECORDS, atau
# - DNS NS kosong (supaya kita coba ambil dari RDAP)
if not skip_rdap:
if dns_info["dns_error"] == "NO_RECORDS" or not dns_info["NS"]:
rdap_info = rdap_lookup(domain, timeout=rdap_timeout)
return classify_row(domain, dns_info, rdap_info)
def writer_loop(result_queue, output_dir, total_input, skipped_resume):
os.makedirs(output_dir, exist_ok=True)
all_csv = os.path.join(output_dir, "all_checked.csv")
matched_csv = os.path.join(output_dir, "matched_norecords.csv")
matched_txt = os.path.join(output_dir, "matched_norecords.txt")
others_csv = os.path.join(output_dir, "others.csv")
summary_txt = os.path.join(output_dir, "summary.txt")
progress_log = os.path.join(output_dir, "progress.log")
all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
matched_writer = SafeCsvWriter(matched_csv, FIELDNAMES)
others_writer = SafeCsvWriter(others_csv, FIELDNAMES)
matched_txt_writer = SafeLineWriter(matched_txt)
log_writer = SafeLineWriter(progress_log)
counts_match = Counter()
counts_type = Counter()
processed_now = 0
def write_summary():
lines = []
lines.append("FIND NO_RECORDS ENRICHED SUMMARY")
lines.append("=" * 40)
lines.append(f"generated_at : {now_utc()}")
lines.append(f"total_input : {total_input}")
lines.append(f"skipped_resume : {skipped_resume}")
lines.append(f"processed_now : {processed_now}")
lines.append(f"remaining_est : {max(total_input - skipped_resume - processed_now, 0)}")
lines.append("")
lines.append("MATCH COUNTS")
lines.append("-" * 40)
lines.append(f"MATCH YES : {counts_match.get('YES', 0)}")
lines.append(f"MATCH NO : {counts_match.get('NO', 0)}")
lines.append("")
lines.append("PAGE TYPE COUNTS")
lines.append("-" * 40)
for k, v in counts_type.most_common():
lines.append(f"{k:25}: {v}")
atomic_write_text(summary_txt, "\n".join(lines) + "\n")
log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")
write_summary()
while True:
item = result_queue.get()
if item is None:
break
row = item
domain = row["domain"]
all_writer.writerow(row)
if row["match"] == "YES":
matched_writer.writerow(row)
matched_txt_writer.write_line(domain)
else:
others_writer.writerow(row)
counts_match[row["match"]] += 1
counts_type[row["page_type"]] += 1
processed_now += 1
write_summary()
log_writer.write_line(
f"DONE {now_utc()} {domain} match={row['match']} "
f"dns_error={row['dns_error']} dns_ns={row['dns_ns']} rdap_ns={row['rdap_ns']}"
)
color = C_GREEN if row["match"] == "YES" else C_RED
print(
f"[{processed_now}] "
f"{domain:30} -> {color}{row['match']}{C_RESET} | "
f"{row['dns_error'][:15]:15} | "
f"dns_ns={row['dns_ns'][:25]:25} | "
f"rdap_ns={row['rdap_ns'][:25]:25} | "
f"expire_in={str(row['days_to_expire'])[:6]}"
)
write_summary()
log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")
all_writer.close()
matched_writer.close()
others_writer.close()
matched_txt_writer.close()
log_writer.close()
def worker(domain, args, result_queue):
if STOP_EVENT.is_set():
return
try:
row = process_domain(
domain=domain,
dns_timeout=args.dns_timeout,
rdap_timeout=args.rdap_timeout,
skip_rdap=args.skip_rdap,
)
except Exception as e:
row = {
"checked_at": now_utc(),
"domain": domain,
"match": "NO",
"bucket": "OTHER",
"page_type": "SCRIPT ERROR",
"dns_ok": "",
"dns_error": type(e).__name__,
"A": "",
"AAAA": "",
"CNAME": "",
"dns_ns": "",
"rdap_ns": "",
"rdap_status": "",
"created_at": "",
"expires_at": "",
"age_days": "",
"days_to_expire": "",
"notes": str(e),
"rdap_error": "",
}
result_queue.put(row)
def parse_args():
parser = argparse.ArgumentParser(
description="Cari domain dns_error=NO_RECORDS dan enrich dengan NS/age via RDAP"
)
parser.add_argument("-i", "--input", required=True, help="File domain input")
parser.add_argument("-o", "--output", default="norecords_results", help="Folder output")
parser.add_argument("-w", "--workers", type=int, default=DEFAULT_WORKERS, help="Jumlah worker")
parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout")
parser.add_argument("--rdap-timeout", type=int, default=DEFAULT_RDAP_TIMEOUT, help="RDAP timeout")
parser.add_argument("--skip-rdap", action="store_true", help="Jangan enrich RDAP")
parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
parser.add_argument("--no-resume", action="store_true", help="Jangan resume")
return parser.parse_args()
def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
args = parse_args()
os.makedirs(args.output, exist_ok=True)
all_csv = os.path.join(args.output, "all_checked.csv")
if args.fresh:
for f in [
"all_checked.csv",
"matched_norecords.csv",
"matched_norecords.txt",
"others.csv",
"summary.txt",
"progress.log",
]:
remove_if_exists(os.path.join(args.output, f))
domains = load_domains(args.input)
if not domains:
print("Tidak ada domain valid di file input.")
sys.exit(1)
processed = set()
if not args.no_resume:
processed = load_processed_domains(all_csv)
queue_domains = [d for d in domains if d not in processed]
print(f"{C_CYAN}Total input : {len(domains)}{C_RESET}")
print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
print(f"{C_CYAN}Akan diproses : {len(queue_domains)}{C_RESET}")
print(f"{C_CYAN}Workers : {args.workers}{C_RESET}")
print(f"{C_CYAN}DNS timeout : {args.dns_timeout}{C_RESET}")
print(f"{C_CYAN}RDAP timeout : {args.rdap_timeout}{C_RESET}")
print(f"{C_CYAN}Skip RDAP : {args.skip_rdap}{C_RESET}")
print(f"{C_CYAN}Output folder : {args.output}{C_RESET}")
print("-" * 120)
result_queue = Queue()
writer_thread = threading.Thread(
target=writer_loop,
args=(result_queue, args.output, len(domains), len(processed)),
daemon=True,
)
writer_thread.start()
executor = ThreadPoolExecutor(max_workers=args.workers)
try:
futures = []
for domain in queue_domains:
if STOP_EVENT.is_set():
break
futures.append(executor.submit(worker, domain, args, result_queue))
for f in futures:
if STOP_EVENT.is_set():
break
try:
f.result()
except Exception:
pass
except KeyboardInterrupt:
STOP_EVENT.set()
print(f"\n{C_RED}Dihentikan user.{C_RESET}")
finally:
try:
executor.shutdown(wait=False, cancel_futures=True)
except TypeError:
executor.shutdown(wait=False)
result_queue.put(None)
writer_thread.join(timeout=10)
print("-" * 120)
print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
print(f"All checked : {os.path.join(args.output, 'all_checked.csv')}")
print(f"Matched CSV : {os.path.join(args.output, 'matched_norecords.csv')}")
print(f"Matched TXT : {os.path.join(args.output, 'matched_norecords.txt')}")
print(f"Others CSV : {os.path.join(args.output, 'others.csv')}")
print(f"Summary : {os.path.join(args.output, 'summary.txt')}")
if __name__ == "__main__":
main()
Cara pakai
Normal
python3 find_norecords_enriched.py -i domains.txt -o norecords_scan
Lebih cepat
python3 find_norecords_enriched.py -i domains.txt -o norecords_scan -w 80 --dns-timeout 1
Kalau ingin DNS-only super cepat
python3 find_norecords_enriched.py -i domains.txt -o norecords_scan -w 100 --dns-timeout 1 --skip-rdap
Mulai ulang dari nol
python3 find_norecords_enriched.py -i domains.txt -o norecords_scan --fresh
Output penting
matched_norecords.csv
Ini yang paling penting buat Anda.
Isinya domain yang match:
dns_ok = Falsedns_error = NO_RECORDS- diberi label:
bucket = OFFLINEpage_type = CONNECTION FAILED
plus tambahan:
dns_nsrdap_nsrdap_statuscreated_atexpires_atage_daysdays_to_expire
matched_norecords.txt
List domain saja.
Kenapa versi ini lebih cocok buat hunting domain?
Karena sekarang Anda bisa lihat:
contoh skenario 1
dns_error = NO_RECORDSdns_nskosongrdap_nsadadays_to_expire = 5
Artinya:
- domain masih terdaftar
- web record tidak ada
- registrar/registry masih punya NS
- mungkin bagus untuk dipantau / shortlist
contoh skenario 2
dns_error = NO_RECORDSrdap_status = client hold, server holdrdap_nsada
Artinya:
- ini bukan domain bebas
- mungkin ditangguhkan / billing / issue registry
contoh skenario 3
NXDOMAINrdap_errorjuga gagal
Artinya:
- kemungkinan memang tidak aktif / tidak resolve / bisa jadi sudah drop
Kenapa saya tidak “semua grep NS” di script lama?
Karena script lama dioptimalkan untuk:
- klasifikasi status domain cepat
- bukan enrich data registrasi
- NS yang diambil hanya DNS live
- bukan WHOIS/RDAP
Padahal untuk kebutuhan Anda, yang penting justru:
- NS dari RDAP/WHOIS
- creation date
- expiry date
- status hold/redemption
- shortlist beli
Jadi versi ini memang lebih tepat.
Catatan penting soal akurasi
RDAP cukup bagus, tapi tetap ada batas:
- tidak semua TLD memberi data lengkap
- beberapa ccTLD aneh
- beberapa registry membatasi request
created_at/expires_atkadang kosong- nameserver dari RDAP bisa ada walau DNS live tidak menjawab
Tapi justru itu yang Anda butuhkan untuk analisa.
Saran workflow terbaik
Untuk kebutuhan pribadi Anda, saya sarankan begini:
Mode 1: scan umum cepat
Pakai:
domain_status_fast.py
Mode 2: cari kandidat no-record
Pakai:
find_norecords_enriched.py
Jadi:
- scan domain besar
- ambil yang
NO_RECORDS - lihat:
rdap_nscreated_atexpires_atdays_to_expire
- shortlist untuk beli / ditawarkan
Kalau mau, saya bisa lanjut satu langkah lagi:
buatkan script filter shortlist
misalnya otomatis memilih domain dengan aturan seperti:
dns_error = NO_RECORDSdays_to_expire <= 30age_days >= 365rdap_statusmengandungholdatau kosong- output ke
shortlist.csv
Kalau Anda mau, saya bisa buatkan itu juga.