feat(dns): add dnscrypt and dns over tcp

This commit is contained in:
2026-02-04 22:08:05 +00:00
parent 5d9b630d13
commit 92351a80a9
12 changed files with 2576 additions and 568 deletions

View File

@@ -0,0 +1,369 @@
package main
import (
"encoding/csv"
"fmt"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"
)
type QueryRecord struct {
Domain string
QueryType string
Protocol string
DNSSec string
AuthDNSSec string
KeepAlive string
DNSServer string
Timestamp string
DurationNs int64
DurationMs float64
RequestSizeBytes int
ResponseSizeBytes int
ResponseCode string
Error string
BytesSent int64
BytesReceived int64
PacketsSent int64
PacketsReceived int64
TotalBytes int64
}
func parseRFC3339Nano(ts string) (time.Time, error) {
return time.Parse(time.RFC3339Nano, ts)
}
func processProviderFolder(providerPath string) error {
providerName := filepath.Base(providerPath)
fmt.Printf("\n=== Processing provider: %s ===\n", providerName)
files, err := os.ReadDir(providerPath)
if err != nil {
return err
}
processed := 0
skipped := 0
errors := 0
for _, file := range files {
if !strings.HasSuffix(file.Name(), ".csv") {
continue
}
csvPath := filepath.Join(providerPath, file.Name())
pcapPath := strings.Replace(csvPath, ".csv", ".pcap", 1)
// Check if PCAP exists
if _, err := os.Stat(pcapPath); os.IsNotExist(err) {
fmt.Printf(" ⊗ Skipping: %s (no matching PCAP)\n", file.Name())
skipped++
continue
}
// Check if already processed (has backup)
backupPath := csvPath + ".bak"
if _, err := os.Stat(backupPath); err == nil {
fmt.Printf(" ⊙ Skipping: %s (already processed, backup exists)\n", file.Name())
skipped++
continue
}
fmt.Printf(" ↻ Processing: %s ... ", file.Name())
if err := processPair(csvPath, pcapPath); err != nil {
fmt.Printf("ERROR\n")
log.Printf(" Error: %v\n", err)
errors++
} else {
fmt.Printf("✓\n")
processed++
}
}
fmt.Printf(" Summary: %d processed, %d skipped, %d errors\n", processed, skipped, errors)
return nil
}
func processPair(csvPath, pcapPath string) error {
// Create backup
backupPath := csvPath + ".bak"
input, err := os.ReadFile(csvPath)
if err != nil {
return fmt.Errorf("backup read failed: %w", err)
}
if err := os.WriteFile(backupPath, input, 0644); err != nil {
return fmt.Errorf("backup write failed: %w", err)
}
// Read CSV records
records, err := readCSV(csvPath)
if err != nil {
return fmt.Errorf("CSV read failed: %w", err)
}
if len(records) == 0 {
return fmt.Errorf("no records in CSV")
}
// Read and parse PCAP
packets, err := readPCAPGo(pcapPath)
if err != nil {
return fmt.Errorf("PCAP read failed: %w", err)
}
// Enrich records with bandwidth data
enrichRecords(records, packets)
// Write enriched CSV
if err := writeCSV(csvPath, records); err != nil {
return fmt.Errorf("CSV write failed: %w", err)
}
return nil
}
func readCSV(path string) ([]*QueryRecord, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
r := csv.NewReader(f)
rows, err := r.ReadAll()
if err != nil {
return nil, err
}
if len(rows) < 2 {
return nil, fmt.Errorf("CSV has no data rows")
}
records := make([]*QueryRecord, 0, len(rows)-1)
for i := 1; i < len(rows); i++ {
row := rows[i]
if len(row) < 14 {
log.Printf(" Warning: Skipping malformed row %d", i+1)
continue
}
durationNs, _ := strconv.ParseInt(row[8], 10, 64)
durationMs, _ := strconv.ParseFloat(row[9], 64)
reqSize, _ := strconv.Atoi(row[10])
respSize, _ := strconv.Atoi(row[11])
records = append(records, &QueryRecord{
Domain: row[0],
QueryType: row[1],
Protocol: row[2],
DNSSec: row[3],
AuthDNSSec: row[4],
KeepAlive: row[5],
DNSServer: row[6],
Timestamp: row[7],
DurationNs: durationNs,
DurationMs: durationMs,
RequestSizeBytes: reqSize,
ResponseSizeBytes: respSize,
ResponseCode: row[12],
Error: row[13],
})
}
return records, nil
}
type PacketInfo struct {
Timestamp time.Time
Size int
IsSent bool
}
func readPCAPGo(path string) ([]PacketInfo, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
reader, err := pcapgo.NewReader(f)
if err != nil {
return nil, err
}
var packets []PacketInfo
packetSource := gopacket.NewPacketSource(reader, reader.LinkType())
for packet := range packetSource.Packets() {
if packet.NetworkLayer() == nil {
continue
}
isDNS := false
isSent := false
// Check UDP layer (DNS, DoQ, DoH3)
if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp := udpLayer.(*layers.UDP)
isDNS = udp.SrcPort == 53 || udp.DstPort == 53 ||
udp.SrcPort == 853 || udp.DstPort == 853 ||
udp.SrcPort == 443 || udp.DstPort == 443
isSent = udp.DstPort == 53 || udp.DstPort == 853 || udp.DstPort == 443
}
// Check TCP layer (DoT, DoH)
if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil {
tcp := tcpLayer.(*layers.TCP)
isDNS = tcp.SrcPort == 53 || tcp.DstPort == 53 ||
tcp.SrcPort == 853 || tcp.DstPort == 853 ||
tcp.SrcPort == 443 || tcp.DstPort == 443
isSent = tcp.DstPort == 53 || tcp.DstPort == 853 || tcp.DstPort == 443
}
if isDNS {
packets = append(packets, PacketInfo{
Timestamp: packet.Metadata().Timestamp,
Size: len(packet.Data()),
IsSent: isSent,
})
}
}
return packets, nil
}
func enrichRecords(records []*QueryRecord, packets []PacketInfo) {
for _, rec := range records {
ts, err := parseRFC3339Nano(rec.Timestamp)
if err != nil {
log.Printf(" Warning: Failed to parse timestamp: %s", rec.Timestamp)
continue
}
// Define time window for this query
windowStart := ts
windowEnd := ts.Add(time.Duration(rec.DurationNs))
var sent, recv, pktSent, pktRecv int64
// Match packets within the time window
for _, pkt := range packets {
if (pkt.Timestamp.Equal(windowStart) || pkt.Timestamp.After(windowStart)) &&
pkt.Timestamp.Before(windowEnd) {
if pkt.IsSent {
sent += int64(pkt.Size)
pktSent++
} else {
recv += int64(pkt.Size)
pktRecv++
}
}
}
rec.BytesSent = sent
rec.BytesReceived = recv
rec.PacketsSent = pktSent
rec.PacketsReceived = pktRecv
rec.TotalBytes = sent + recv
}
}
func writeCSV(path string, records []*QueryRecord) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
// Write header
header := []string{
"domain", "query_type", "protocol", "dnssec", "auth_dnssec",
"keep_alive", "dns_server", "timestamp", "duration_ns", "duration_ms",
"request_size_bytes", "response_size_bytes", "response_code", "error",
"bytes_sent", "bytes_received", "packets_sent", "packets_received", "total_bytes",
}
if err := w.Write(header); err != nil {
return err
}
// Write data rows
for _, rec := range records {
row := []string{
rec.Domain,
rec.QueryType,
rec.Protocol,
rec.DNSSec,
rec.AuthDNSSec,
rec.KeepAlive,
rec.DNSServer,
rec.Timestamp,
strconv.FormatInt(rec.DurationNs, 10),
strconv.FormatFloat(rec.DurationMs, 'f', -1, 64),
strconv.Itoa(rec.RequestSizeBytes),
strconv.Itoa(rec.ResponseSizeBytes),
rec.ResponseCode,
rec.Error,
strconv.FormatInt(rec.BytesSent, 10),
strconv.FormatInt(rec.BytesReceived, 10),
strconv.FormatInt(rec.PacketsSent, 10),
strconv.FormatInt(rec.PacketsReceived, 10),
strconv.FormatInt(rec.TotalBytes, 10),
}
if err := w.Write(row); err != nil {
return err
}
}
return nil
}
func main() {
resultsDir := "results"
providers := []string{"adguard", "cloudflare", "google", "quad9"}
fmt.Println("╔═══════════════════════════════════════════════╗")
fmt.Println("║ DNS PCAP Preprocessor v1.0 ║")
fmt.Println("║ Enriching ALL CSVs with bandwidth metrics ║")
fmt.Println("╚═══════════════════════════════════════════════╝")
totalProcessed := 0
totalSkipped := 0
totalErrors := 0
for _, provider := range providers {
providerPath := filepath.Join(resultsDir, provider)
if _, err := os.Stat(providerPath); os.IsNotExist(err) {
fmt.Printf("\n⚠ Provider folder not found: %s\n", provider)
continue
}
if err := processProviderFolder(providerPath); err != nil {
log.Printf("Error processing %s: %v\n", provider, err)
totalErrors++
}
}
fmt.Println("\n╔═══════════════════════════════════════════════╗")
fmt.Println("║ Preprocessing Complete! ║")
fmt.Println("╚═══════════════════════════════════════════════╝")
fmt.Printf("\nAll CSV files now have 5 additional columns:\n")
fmt.Printf(" • bytes_sent - Total bytes sent to DNS server\n")
fmt.Printf(" • bytes_received - Total bytes received from DNS server\n")
fmt.Printf(" • packets_sent - Number of packets sent\n")
fmt.Printf(" • packets_received - Number of packets received\n")
fmt.Printf(" • total_bytes - Sum of sent + received bytes\n")
fmt.Printf("\n📁 Backups saved as: *.csv.bak\n")
fmt.Printf("\n💡 Tip: The analysis script will filter which files to visualize,\n")
fmt.Printf(" but all files now have complete bandwidth metrics!\n")
}

View File

@@ -1,250 +1,362 @@
#!/usr/bin/env python3
"""
Add network metrics from PCAP files to DNS CSV files.
Adds: raw_bytes_total, raw_packet_count, overhead_bytes, efficiency_percent
Fast PCAP Preprocessor for DNS QoS Analysis
Loads PCAP into memory first, then uses binary search for matching.
Uses LAN IP to determine direction (LAN = sent, non-LAN = received).
"""
import csv
import os
import argparse
import re
import shutil
from pathlib import Path
from datetime import datetime, timezone
from scapy.all import rdpcap
from typing import Dict, List, NamedTuple
import time
def parse_timestamp(ts_str):
"""Parse timestamp with timezone and nanoseconds (RFC3339Nano)."""
match = re.match(
r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})\.(\d+)([\+\-]\d{2}:\d{2})',
ts_str
)
if not match:
raise ValueError(f"Invalid timestamp format: {ts_str}")
base, nanos, tz = match.groups()
micros = nanos[:6].ljust(6, '0')
iso_str = f"{base}.{micros}{tz}"
dt = datetime.fromisoformat(iso_str)
full_nanos = int(nanos.ljust(9, '0'))
return dt, full_nanos
import dpkt
from dateutil import parser as date_parser
def read_pcap(pcap_path):
"""Read PCAP and return list of (timestamp_epoch, size)."""
class Packet(NamedTuple):
"""Lightweight packet representation."""
timestamp: float
size: int
is_outbound: bool # True if from LAN, False if from internet
class QueryWindow:
"""Efficient query window representation."""
__slots__ = ['index', 'start', 'end', 'sent', 'received', 'pkts_sent', 'pkts_received']
def __init__(self, index: int, start: float, end: float):
self.index = index
self.start = start
self.end = end
self.sent = 0
self.received = 0
self.pkts_sent = 0
self.pkts_received = 0
def parse_csv_timestamp(ts_str: str) -> float:
"""Convert RFC3339Nano timestamp to Unix epoch (seconds)."""
dt = date_parser.isoparse(ts_str)
return dt.timestamp()
def is_lan_ip(ip_bytes: bytes) -> bool:
"""Check if IP is a private/LAN address."""
if len(ip_bytes) != 4:
return False
first = ip_bytes[0]
second = ip_bytes[1]
# 10.0.0.0/8
if first == 10:
return True
# 172.16.0.0/12
if first == 172 and 16 <= second <= 31:
return True
# 192.168.0.0/16
if first == 192 and second == 168:
return True
# 127.0.0.0/8 (localhost)
if first == 127:
return True
return False
def load_pcap_into_memory(pcap_path: Path) -> List[Packet]:
"""Load all packets from PCAP into memory with minimal data."""
packets = []
print(f" Loading PCAP into memory...")
start_time = time.time()
try:
pkts = rdpcap(str(pcap_path))
for pkt in pkts:
timestamp = float(pkt.time)
length = len(pkt)
packets.append((timestamp, length))
with open(pcap_path, 'rb') as f:
try:
pcap = dpkt.pcap.Reader(f)
except:
# Try pcapng format
f.seek(0)
pcap = dpkt.pcapng.Reader(f)
for ts, buf in pcap:
try:
packet_time = float(ts)
packet_size = len(buf)
# Parse to get source IP
eth = dpkt.ethernet.Ethernet(buf)
# Default to outbound if we can't determine
is_outbound = True
if isinstance(eth.data, dpkt.ip.IP):
ip = eth.data
src_ip = ip.src
is_outbound = is_lan_ip(src_ip)
packets.append(Packet(
timestamp=packet_time,
size=packet_size,
is_outbound=is_outbound
))
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError, AttributeError):
continue
except Exception as e:
print(f" Error reading PCAP: {e}")
print(f" Error reading PCAP: {e}")
return []
elapsed = time.time() - start_time
print(f" Loaded {len(packets):,} packets in {elapsed:.2f}s")
# Sort by timestamp for binary search
packets.sort(key=lambda p: p.timestamp)
return packets
def find_packets_in_window(packets, start_ts, start_nanos, duration_ns):
"""Find packets within exact time window."""
start_epoch = start_ts.timestamp()
start_epoch += (start_nanos % 1_000_000) / 1_000_000_000
end_epoch = start_epoch + (duration_ns / 1_000_000_000)
total_bytes = 0
packet_count = 0
for pkt_ts, pkt_len in packets:
if start_epoch <= pkt_ts <= end_epoch:
total_bytes += pkt_len
packet_count += 1
return total_bytes, packet_count
def enhance_csv(csv_path, pcap_path, output_path, debug=False):
"""Add PCAP metrics to CSV."""
if not os.path.exists(pcap_path):
print(f"⚠️ PCAP not found: {pcap_path}")
return False
print(f"Processing: {os.path.basename(csv_path)}")
# Read PCAP
packets = read_pcap(pcap_path)
print(f" Loaded {len(packets)} packets")
def find_packets_in_window(
packets: List[Packet],
start_time: float,
end_time: float,
left_hint: int = 0
) -> tuple[List[Packet], int]:
"""
Binary search to find all packets within time window.
Returns (matching_packets, left_index_hint_for_next_search).
"""
if not packets:
print(" ❌ No packets found")
return False
return [], 0
if packets and debug:
first_pcap = packets[0][0]
last_pcap = packets[-1][0]
print(f" First PCAP packet: {first_pcap:.6f}")
print(f" Last PCAP packet: {last_pcap:.6f}")
print(f" PCAP duration: {(last_pcap - first_pcap):.3f}s")
# Binary search for first packet >= start_time
left, right = left_hint, len(packets) - 1
first_idx = len(packets)
# Read CSV
with open(csv_path, 'r', newline='') as f:
reader = csv.DictReader(f)
fieldnames = list(reader.fieldnames) + [
'raw_bytes_total',
'raw_packet_count',
'overhead_bytes',
'efficiency_percent'
]
rows = list(reader)
while left <= right:
mid = (left + right) // 2
if packets[mid].timestamp >= start_time:
first_idx = mid
right = mid - 1
else:
left = mid + 1
if rows and debug:
try:
first_ts, _ = parse_timestamp(rows[0]['timestamp'])
last_ts, _ = parse_timestamp(rows[-1]['timestamp'])
print(f" First CSV query: {first_ts.timestamp():.6f}")
print(f" Last CSV query: {last_ts.timestamp():.6f}")
offset = packets[0][0] - first_ts.timestamp()
print(f" Time offset (PCAP - CSV): {offset:.3f}s")
except:
pass
# No packets in range
if first_idx >= len(packets) or packets[first_idx].timestamp > end_time:
return [], first_idx
# Enhance rows
enhanced = []
matched = 0
# Collect all packets in window
matching = []
idx = first_idx
while idx < len(packets) and packets[idx].timestamp <= end_time:
matching.append(packets[idx])
idx += 1
for i, row in enumerate(rows):
try:
timestamp, nanos = parse_timestamp(row['timestamp'])
duration_ns = int(row['duration_ns'])
raw_bytes, packet_count = find_packets_in_window(
packets, timestamp, nanos, duration_ns
)
useful_bytes = (
int(row['request_size_bytes']) +
int(row['response_size_bytes'])
)
overhead = raw_bytes - useful_bytes
efficiency = (
(useful_bytes / raw_bytes * 100)
if raw_bytes > 0 else 0
)
row['raw_bytes_total'] = raw_bytes
row['raw_packet_count'] = packet_count
row['overhead_bytes'] = overhead
row['efficiency_percent'] = f"{efficiency:.2f}"
if raw_bytes > 0:
matched += 1
# Debug first few queries
if debug and i < 3:
print(f" Query {i}: {row['domain']}")
print(f" Duration: {duration_ns / 1e6:.3f}ms")
print(f" Matched packets: {packet_count}")
print(f" Raw bytes: {raw_bytes}")
print(f" Useful bytes: {useful_bytes}")
print(f" Efficiency: {efficiency:.2f}%")
except (ValueError, KeyError) as e:
if debug:
print(f" Error processing row {i}: {e}")
row['raw_bytes_total'] = 0
row['raw_packet_count'] = 0
row['overhead_bytes'] = 0
row['efficiency_percent'] = "0.00"
enhanced.append(row)
print(f" Matched: {matched}/{len(rows)} queries")
if matched == 0:
print(" ⚠️ WARNING: No queries matched any packets!")
print(" This might indicate timestamp misalignment.")
# Write output
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(enhanced)
print(f" ✓ Saved: {output_path}")
return True
return matching, first_idx
def main():
parser = argparse.ArgumentParser(
description='Add PCAP network metrics to DNS CSV files'
)
parser.add_argument('input_dir', help='Input directory (e.g., results)')
parser.add_argument(
'--output',
default='./results_enriched',
help='Output directory (default: ./results_enriched)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Preview files without processing'
)
parser.add_argument(
'--debug',
action='store_true',
help='Show detailed timing information'
)
def load_csv_queries(csv_path: Path) -> List[Dict]:
"""Load CSV and create query data structures."""
queries = []
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
try:
ts_epoch = parse_csv_timestamp(row['timestamp'])
duration_s = float(row['duration_ns']) / 1e9
queries.append({
'data': row,
'start_time': ts_epoch,
'end_time': ts_epoch + duration_s,
})
except Exception as e:
print(f" Warning: Skipping row - {e}")
continue
return queries
def match_packets_to_queries(
packets: List[Packet],
queries: List[Dict]
) -> List[Dict]:
"""Match packets to query windows using binary search."""
if not queries or not packets:
return queries
args = parser.parse_args()
print(f" Matching packets to queries...")
start_time = time.time()
print("=" * 60)
print("ENHANCE DNS CSVs WITH PCAP METRICS")
print("=" * 60)
print(f"Input: {args.input_dir}")
print(f"Output: {args.output}")
if args.debug:
print("Debug: ENABLED")
print()
# Initialize metrics
for q in queries:
q['bytes_sent'] = 0
q['bytes_received'] = 0
q['packets_sent'] = 0
q['packets_received'] = 0
q['total_bytes'] = 0
# Find CSV files
csv_files = list(Path(args.input_dir).rglob('*.csv'))
# Sort queries by start time for sequential processing
queries_sorted = sorted(enumerate(queries), key=lambda x: x[1]['start_time'])
if not csv_files:
print("❌ No CSV files found")
return 1
matched_packets = 0
left_hint = 0 # Optimization: start next search from here
print(f"Found {len(csv_files)} CSV files\n")
for original_idx, q in queries_sorted:
matching, left_hint = find_packets_in_window(
packets,
q['start_time'],
q['end_time'],
left_hint
)
for pkt in matching:
matched_packets += 1
if pkt.is_outbound:
q['bytes_sent'] += pkt.size
q['packets_sent'] += 1
else:
q['bytes_received'] += pkt.size
q['packets_received'] += 1
q['total_bytes'] = q['bytes_sent'] + q['bytes_received']
if args.dry_run:
print("DRY RUN - would process:")
for csv_path in csv_files:
pcap_path = csv_path.with_suffix('.pcap')
print(f" {csv_path.relative_to(args.input_dir)}")
print(f" PCAP: {'' if pcap_path.exists() else ''}")
return 0
elapsed = time.time() - start_time
print(f" Matched {matched_packets:,} packets in {elapsed:.2f}s")
# Process files
success = 0
failed = 0
# Statistics
total_sent = sum(q['bytes_sent'] for q in queries)
total_recv = sum(q['bytes_received'] for q in queries)
queries_with_data = sum(1 for q in queries if q['total_bytes'] > 0)
print(f" Total: {total_sent:,} bytes sent, {total_recv:,} bytes received")
print(f" Queries with data: {queries_with_data}/{len(queries)}")
return queries
def write_enriched_csv(
csv_path: Path, queries: List[Dict], backup: bool = True
):
"""Write enriched CSV with bandwidth columns."""
if backup and csv_path.exists():
backup_path = csv_path.with_suffix('.csv.bak')
if not backup_path.exists(): # Don't overwrite existing backup
shutil.copy2(csv_path, backup_path)
print(f" Backup: {backup_path.name}")
# Get fieldnames
original_fields = list(queries[0]['data'].keys())
new_fields = [
'bytes_sent',
'bytes_received',
'packets_sent',
'packets_received',
'total_bytes',
]
fieldnames = original_fields + new_fields
with open(csv_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for q in queries:
row = q['data'].copy()
for field in new_fields:
row[field] = q[field]
writer.writerow(row)
print(f" Written: {csv_path.name}")
def process_provider_directory(provider_path: Path):
"""Process all CSV/PCAP pairs in a provider directory."""
print(f"\n{'='*60}")
print(f"Processing: {provider_path.name.upper()}")
print(f"{'='*60}")
csv_files = sorted(provider_path.glob('*.csv'))
processed = 0
total_time = 0
for csv_path in csv_files:
pcap_path = csv_path.with_suffix('.pcap')
rel_path = csv_path.relative_to(args.input_dir)
output_path = Path(args.output) / rel_path
# Skip backup files
if '.bak' in csv_path.name:
continue
if enhance_csv(str(csv_path), str(pcap_path), str(output_path),
args.debug):
success += 1
else:
failed += 1
print()
pcap_path = csv_path.with_suffix('.pcap')
if not pcap_path.exists():
print(f"\n ⚠ Skipping {csv_path.name} - no matching PCAP")
continue
print(f"\n 📁 {csv_path.name}")
file_start = time.time()
# Load PCAP into memory first
packets = load_pcap_into_memory(pcap_path)
if not packets:
print(f" ⚠ No packets found in PCAP")
continue
# Load CSV queries
queries = load_csv_queries(csv_path)
if not queries:
print(f" ⚠ No valid queries found")
continue
print(f" Loaded {len(queries):,} queries")
# Match packets to queries
enriched_queries = match_packets_to_queries(packets, queries)
# Write enriched CSV
write_enriched_csv(csv_path, enriched_queries)
file_time = time.time() - file_start
total_time += file_time
processed += 1
print(f" ✓ Completed in {file_time:.2f}s")
# Summary
print("=" * 60)
print(f"✓ Success: {success}")
print(f"✗ Failed: {failed}")
print(f"Total: {len(csv_files)}")
print(f"\nOutput: {args.output}")
return 0 if failed == 0 else 1
print(f"\n {'='*58}")
print(f" {provider_path.name}: {processed} files in {total_time:.2f}s")
print(f" {'='*58}")
if __name__ == "__main__":
exit(main())
def main():
"""Main preprocessing pipeline."""
overall_start = time.time()
print("\n" + "="*60)
print("DNS PCAP PREPROCESSOR - Memory-Optimized Edition")
print("="*60)
results_dir = Path('results')
if not results_dir.exists():
print(f"\n❌ Error: '{results_dir}' directory not found")
return
providers = ['adguard', 'cloudflare', 'google', 'quad9']
for provider in providers:
provider_path = results_dir / provider
if provider_path.exists():
process_provider_directory(provider_path)
else:
print(f"\n⚠ Warning: Provider directory not found: {provider}")
overall_time = time.time() - overall_start
print("\n" + "="*60)
print(f"✓ PREPROCESSING COMPLETE")
print(f" Total time: {overall_time:.2f}s ({overall_time/60:.1f} minutes)")
print("="*60 + "\n")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""
Convert DNS CSV files to SQLite database.
Creates a single normalized table with unified DNSSEC handling.
"""
import sqlite3
import csv
from pathlib import Path
from dateutil import parser as date_parser
def create_database_schema(conn: sqlite3.Connection):
"""Create the database schema with indexes."""
cursor = conn.cursor()
# Main queries table
cursor.execute("""
CREATE TABLE IF NOT EXISTS dns_queries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
-- Metadata
provider TEXT NOT NULL,
protocol TEXT NOT NULL,
dnssec_mode TEXT NOT NULL CHECK(dnssec_mode IN ('off', 'auth', 'trust')),
-- Query details
domain TEXT NOT NULL,
query_type TEXT NOT NULL,
keep_alive BOOLEAN NOT NULL,
dns_server TEXT NOT NULL,
-- Timing
timestamp TEXT NOT NULL,
timestamp_unix REAL NOT NULL,
duration_ns INTEGER NOT NULL,
duration_ms REAL NOT NULL,
-- Size metrics
request_size_bytes INTEGER,
response_size_bytes INTEGER,
-- Network metrics (from PCAP)
bytes_sent INTEGER DEFAULT 0,
bytes_received INTEGER DEFAULT 0,
packets_sent INTEGER DEFAULT 0,
packets_received INTEGER DEFAULT 0,
total_bytes INTEGER DEFAULT 0,
-- Response
response_code TEXT,
error TEXT
)
""")
# Create indexes for common queries
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_provider
ON dns_queries(provider)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_protocol
ON dns_queries(protocol)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_dnssec_mode
ON dns_queries(dnssec_mode)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_keep_alive
ON dns_queries(keep_alive)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_provider_protocol_dnssec
ON dns_queries(provider, protocol, dnssec_mode)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp
ON dns_queries(timestamp_unix)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_domain
ON dns_queries(domain)
""")
conn.commit()
def parse_protocol_and_dnssec(filename: str) -> tuple[str, str, bool]:
"""
Extract base protocol, DNSSEC mode, and keep_alive from filename.
Returns (base_protocol, dnssec_mode, keep_alive)
Examples:
'udp.csv' -> ('udp', 'off', False)
'udp-auth.csv' -> ('udp', 'auth', False)
'tls.csv' -> ('tls', 'off', False)
'tls-persist.csv' -> ('tls', 'off', True)
'https-persist.csv' -> ('https', 'off', True)
'https-auth-persist.csv' -> ('https', 'auth', True)
'https-trust-persist.csv' -> ('https', 'trust', True)
'doh3-auth.csv' -> ('doh3', 'auth', False)
'doq.csv' -> ('doq', 'off', False)
"""
name = filename.replace('.csv', '')
# Check for persist suffix (keep_alive)
keep_alive = False
if name.endswith('-persist'):
keep_alive = True
name = name.replace('-persist', '')
# Check for DNSSEC suffix
dnssec_mode = 'off'
if name.endswith('-auth'):
dnssec_mode = 'auth'
name = name.replace('-auth', '')
elif name.endswith('-trust'):
dnssec_mode = 'trust'
name = name.replace('-trust', '')
# For UDP, DoH3, and DoQ, keep_alive doesn't apply (connectionless)
if name in ['udp', 'doh3', 'doq']:
keep_alive = False
return (name, dnssec_mode, keep_alive)
def str_to_bool(value: str) -> bool:
"""Convert string boolean to Python bool."""
return value.lower() in ('true', '1', 'yes')
def import_csv_to_db(
csv_path: Path,
provider: str,
conn: sqlite3.Connection
) -> int:
"""Import a CSV file into the database."""
protocol, dnssec_mode, keep_alive_from_filename = parse_protocol_and_dnssec(csv_path.name)
cursor = conn.cursor()
rows_imported = 0
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
try:
# Parse timestamp to Unix epoch
dt = date_parser.isoparse(row['timestamp'])
timestamp_unix = dt.timestamp()
# Use keep_alive from filename (more reliable than CSV)
keep_alive = keep_alive_from_filename
# Handle optional fields (may not exist in older CSVs)
bytes_sent = int(row.get('bytes_sent', 0) or 0)
bytes_received = int(row.get('bytes_received', 0) or 0)
packets_sent = int(row.get('packets_sent', 0) or 0)
packets_received = int(row.get('packets_received', 0) or 0)
total_bytes = int(row.get('total_bytes', 0) or 0)
cursor.execute("""
INSERT INTO dns_queries (
provider, protocol, dnssec_mode,
domain, query_type, keep_alive,
dns_server, timestamp, timestamp_unix,
duration_ns, duration_ms,
request_size_bytes, response_size_bytes,
bytes_sent, bytes_received, packets_sent, packets_received, total_bytes,
response_code, error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
provider,
protocol,
dnssec_mode,
row['domain'],
row['query_type'],
keep_alive,
row['dns_server'],
row['timestamp'],
timestamp_unix,
int(row['duration_ns']),
float(row['duration_ms']),
int(row.get('request_size_bytes') or 0),
int(row.get('response_size_bytes') or 0),
bytes_sent,
bytes_received,
packets_sent,
packets_received,
total_bytes,
row.get('response_code', ''),
row.get('error', '')
))
rows_imported += 1
except Exception as e:
print(f" Warning: Skipping row - {e}")
continue
conn.commit()
return rows_imported
def main():
"""Main import pipeline."""
print("\n" + "="*60)
print("CSV to SQLite Database Converter")
print("="*60)
results_dir = Path('results')
db_path = Path('dns.db')
if not results_dir.exists():
print(f"\n❌ Error: '{results_dir}' directory not found")
return
# Remove existing database
if db_path.exists():
print(f"\n⚠ Removing existing database: {db_path}")
db_path.unlink()
# Create database and schema
print(f"\n📊 Creating database: {db_path}")
conn = sqlite3.connect(db_path)
create_database_schema(conn)
print("✓ Schema created")
# Import CSVs
providers = ['adguard', 'cloudflare', 'google', 'quad9']
total_rows = 0
total_files = 0
for provider in providers:
provider_path = results_dir / provider
if not provider_path.exists():
print(f"\n⚠ Skipping {provider} - directory not found")
continue
print(f"\n{'='*60}")
print(f"Importing: {provider.upper()}")
print(f"{'='*60}")
csv_files = sorted(provider_path.glob('*.csv'))
provider_rows = 0
provider_files = 0
for csv_path in csv_files:
# Skip backup files
if '.bak' in csv_path.name:
continue
protocol, dnssec, keep_alive = parse_protocol_and_dnssec(csv_path.name)
ka_str = "persistent" if keep_alive else "non-persist"
print(f" 📄 {csv_path.name:30}{protocol:8} (DNSSEC: {dnssec:5}, {ka_str})")
rows = import_csv_to_db(csv_path, provider, conn)
print(f" ✓ Imported {rows:,} rows")
provider_rows += rows
provider_files += 1
print(f"\n Total: {provider_files} files, {provider_rows:,} rows")
total_rows += provider_rows
total_files += provider_files
# Create summary
print(f"\n{'='*60}")
print("Database Summary")
print(f"{'='*60}")
cursor = conn.cursor()
# Total counts
cursor.execute("SELECT COUNT(*) FROM dns_queries")
total_queries = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT provider) FROM dns_queries")
unique_providers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT protocol) FROM dns_queries")
unique_protocols = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT domain) FROM dns_queries")
unique_domains = cursor.fetchone()[0]
print(f"\nTotal queries: {total_queries:,}")
print(f"Providers: {unique_providers}")
print(f"Protocols: {unique_protocols}")
print(f"Unique domains: {unique_domains}")
# Show breakdown by provider, protocol, DNSSEC, and keep_alive
print(f"\nBreakdown by Provider, Protocol, DNSSEC & Keep-Alive:")
print(f"{'-'*80}")
cursor.execute("""
SELECT provider, protocol, dnssec_mode, keep_alive, COUNT(*) as count
FROM dns_queries
GROUP BY provider, protocol, dnssec_mode, keep_alive
ORDER BY provider, protocol, dnssec_mode, keep_alive
""")
current_provider = None
for provider, protocol, dnssec, keep_alive, count in cursor.fetchall():
if current_provider != provider:
if current_provider is not None:
print()
current_provider = provider
ka_str = "" if keep_alive else ""
print(f" {provider:12} | {protocol:8} | {dnssec:5} | KA:{ka_str} | {count:6,} queries")
# Protocol distribution
print(f"\n{'-'*80}")
print("Protocol Distribution:")
print(f"{'-'*80}")
cursor.execute("""
SELECT protocol, COUNT(*) as count
FROM dns_queries
GROUP BY protocol
ORDER BY protocol
""")
for protocol, count in cursor.fetchall():
pct = (count / total_queries) * 100
print(f" {protocol:8} | {count:8,} queries ({pct:5.1f}%)")
# DNSSEC mode distribution
print(f"\n{'-'*80}")
print("DNSSEC Mode Distribution:")
print(f"{'-'*80}")
cursor.execute("""
SELECT dnssec_mode, COUNT(*) as count
FROM dns_queries
GROUP BY dnssec_mode
ORDER BY dnssec_mode
""")
for dnssec_mode, count in cursor.fetchall():
pct = (count / total_queries) * 100
print(f" {dnssec_mode:5} | {count:8,} queries ({pct:5.1f}%)")
# Keep-Alive distribution
print(f"\n{'-'*80}")
print("Keep-Alive Distribution:")
print(f"{'-'*80}")
cursor.execute("""
SELECT keep_alive, COUNT(*) as count
FROM dns_queries
GROUP BY keep_alive
""")
for keep_alive, count in cursor.fetchall():
ka_label = "Persistent" if keep_alive else "Non-persistent"
pct = (count / total_queries) * 100
print(f" {ka_label:15} | {count:8,} queries ({pct:5.1f}%)")
conn.close()
print(f"\n{'='*60}")
print(f"✓ Database created successfully: {db_path}")
print(f" Total: {total_files} files, {total_rows:,} rows")
print(f"{'='*60}\n")
# Print usage examples
print("\n📖 Usage Examples for Metabase:")
print(f"{'-'*60}")
print("\n1. Compare protocols (DNSSEC off, persistent only):")
print(""" SELECT provider, protocol,
AVG(duration_ms) as avg_latency,
AVG(total_bytes) as avg_bytes
FROM dns_queries
WHERE dnssec_mode = 'off' AND keep_alive = 1
GROUP BY provider, protocol;""")
print("\n2. DNSSEC impact on UDP:")
print(""" SELECT provider, dnssec_mode,
AVG(duration_ms) as avg_latency
FROM dns_queries
WHERE protocol = 'udp'
GROUP BY provider, dnssec_mode;""")
print("\n3. Keep-alive impact on TLS:")
print(""" SELECT provider, keep_alive,
AVG(duration_ms) as avg_latency,
AVG(total_bytes) as avg_bytes
FROM dns_queries
WHERE protocol = 'tls' AND dnssec_mode = 'off'
GROUP BY provider, keep_alive;""")
print("\n4. Time series for line graphs:")
print(""" SELECT timestamp_unix, duration_ms, total_bytes
FROM dns_queries
WHERE provider = 'cloudflare'
AND protocol = 'https'
AND dnssec_mode = 'off'
AND keep_alive = 1
ORDER BY timestamp_unix;""")
print("\n5. Overall comparison table:")
print(""" SELECT protocol, dnssec_mode, keep_alive,
COUNT(*) as queries,
AVG(duration_ms) as avg_latency,
AVG(total_bytes) as avg_bytes
FROM dns_queries
GROUP BY protocol, dnssec_mode, keep_alive
ORDER BY protocol, dnssec_mode, keep_alive;""")
print(f"\n{'-'*60}\n")
if __name__ == '__main__':
main()