fix(scripts): change add metrics to csv

This commit is contained in:
2025-10-13 00:10:15 +01:00
parent a64d6c6bdb
commit 530be27440

View File

@@ -1,116 +1,68 @@
#!/usr/bin/env python3
"""
Add network metrics from PCAP files to DNS CSV files.
Adds: pcap_network_bytes_in, pcap_network_bytes_out, pcap_overhead_bytes
Adds: raw_bytes_total, raw_packet_count, overhead_bytes, efficiency_percent
"""
import csv
import os
import argparse
import re
from pathlib import Path
from datetime import datetime, timezone
import dpkt
import socket
from scapy.all import rdpcap
# Test machine IPs
TEST_IPS = {
'10.0.0.50',
'2001:818:e73e:ba00:5506:dfd4:ed8b:96e',
'fe80::fe98:c62e:4463:9a2d'
}
def parse_timestamp(ts_str):
"""Parse timestamp with timezone and nanoseconds (RFC3339Nano)."""
match = re.match(
r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})\.(\d+)([\+\-]\d{2}:\d{2})',
ts_str
)
if not match:
raise ValueError(f"Invalid timestamp format: {ts_str}")
def inet_to_str(inet):
"""Convert inet bytes to IP string"""
try:
return socket.inet_ntop(socket.AF_INET, inet)
except ValueError:
try:
return socket.inet_ntop(socket.AF_INET6, inet)
except ValueError:
return None
base, nanos, tz = match.groups()
micros = nanos[:6].ljust(6, '0')
iso_str = f"{base}.{micros}{tz}"
dt = datetime.fromisoformat(iso_str)
full_nanos = int(nanos.ljust(9, '0'))
return dt, full_nanos
def read_pcap(pcap_path):
"""Read PCAP and return list of (timestamp_ns, size, src_ip, dst_ip)"""
"""Read PCAP and return list of (timestamp_epoch, size)."""
packets = []
with open(pcap_path, 'rb') as f:
try:
pcap = dpkt.pcap.Reader(f)
except:
f.seek(0)
pcap = dpkt.pcapng.Reader(f)
for ts, buf in pcap:
try:
# Convert PCAP timestamp (float seconds) to nanoseconds
timestamp_ns = int(ts * 1_000_000_000)
size = len(buf)
eth = dpkt.ethernet.Ethernet(buf)
src_ip = dst_ip = None
if isinstance(eth.data, dpkt.ip.IP):
src_ip = inet_to_str(eth.data.src)
dst_ip = inet_to_str(eth.data.dst)
elif isinstance(eth.data, dpkt.ip6.IP6):
src_ip = inet_to_str(eth.data.src)
dst_ip = inet_to_str(eth.data.dst)
if src_ip and dst_ip:
packets.append((timestamp_ns, size, src_ip, dst_ip))
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
continue
pkts = rdpcap(str(pcap_path))
for pkt in pkts:
timestamp = float(pkt.time)
length = len(pkt)
packets.append((timestamp, length))
except Exception as e:
print(f" ❌ Error reading PCAP: {e}")
return []
return packets
def find_packets_in_window(packets, start_ts, start_nanos, duration_ns):
"""Find packets within exact time window."""
start_epoch = start_ts.timestamp()
start_epoch += (start_nanos % 1_000_000) / 1_000_000_000
end_epoch = start_epoch + (duration_ns / 1_000_000_000)
def find_packets_in_window(packets, start_ns, duration_ns):
"""Find packets within exact time window (nanosecond precision)"""
end_ns = start_ns + duration_ns
total_bytes = 0
packet_count = 0
matching = []
for timestamp_ns, size, src_ip, dst_ip in packets:
if start_ns <= timestamp_ns <= end_ns:
matching.append((size, src_ip, dst_ip))
return matching
def calculate_metrics(packets):
"""Calculate network metrics from packets"""
bytes_in = 0
bytes_out = 0
for size, src_ip, dst_ip in packets:
if dst_ip in TEST_IPS:
bytes_in += size
elif src_ip in TEST_IPS:
bytes_out += size
return {
'pcap_network_bytes_in': bytes_in,
'pcap_network_bytes_out': bytes_out,
'pcap_overhead_bytes': bytes_in + bytes_out
}
def parse_timestamp_to_ns(ts_str):
"""Parse ISO timestamp to nanoseconds since epoch"""
try:
dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc)
# Convert to nanoseconds since epoch
return int(dt.timestamp() * 1_000_000_000)
except ValueError:
return None
for pkt_ts, pkt_len in packets:
if start_epoch <= pkt_ts <= end_epoch:
total_bytes += pkt_len
packet_count += 1
return total_bytes, packet_count
def enhance_csv(csv_path, pcap_path, output_path, debug=False):
"""Add PCAP metrics to CSV"""
"""Add PCAP metrics to CSV."""
if not os.path.exists(pcap_path):
print(f"⚠️ PCAP not found: {pcap_path}")
return False
@@ -118,75 +70,91 @@ def enhance_csv(csv_path, pcap_path, output_path, debug=False):
print(f"Processing: {os.path.basename(csv_path)}")
# Read PCAP
try:
packets = read_pcap(pcap_path)
print(f" Loaded {len(packets)} packets")
if packets and debug:
first_pcap_ns = packets[0][0]
last_pcap_ns = packets[-1][0]
print(f" First PCAP packet: {first_pcap_ns} ns")
print(f" Last PCAP packet: {last_pcap_ns} ns")
print(f" PCAP duration: {(last_pcap_ns - first_pcap_ns) / 1e9:.3f}s")
except Exception as e:
print(f" ❌ Error reading PCAP: {e}")
return False
if not packets:
print(" ❌ No packets found")
return False
if packets and debug:
first_pcap = packets[0][0]
last_pcap = packets[-1][0]
print(f" First PCAP packet: {first_pcap:.6f}")
print(f" Last PCAP packet: {last_pcap:.6f}")
print(f" PCAP duration: {(last_pcap - first_pcap):.3f}s")
# Read CSV
with open(csv_path, 'r', newline='') as f:
reader = csv.DictReader(f)
fieldnames = list(reader.fieldnames) + [
'pcap_network_bytes_in',
'pcap_network_bytes_out',
'pcap_overhead_bytes'
'raw_bytes_total',
'raw_packet_count',
'overhead_bytes',
'efficiency_percent'
]
rows = list(reader)
if rows and debug:
first_csv_ns = parse_timestamp_to_ns(rows[0]['timestamp'])
last_csv_ns = parse_timestamp_to_ns(rows[-1]['timestamp'])
if first_csv_ns and last_csv_ns:
print(f" First CSV query: {first_csv_ns} ns")
print(f" Last CSV query: {last_csv_ns} ns")
print(f" CSV duration: {(last_csv_ns - first_csv_ns) / 1e9:.3f}s")
# Check alignment
offset_ns = packets[0][0] - first_csv_ns
print(f" Time offset (PCAP - CSV): {offset_ns / 1e9:.3f}s")
try:
first_ts, _ = parse_timestamp(rows[0]['timestamp'])
last_ts, _ = parse_timestamp(rows[-1]['timestamp'])
print(f" First CSV query: {first_ts.timestamp():.6f}")
print(f" Last CSV query: {last_ts.timestamp():.6f}")
offset = packets[0][0] - first_ts.timestamp()
print(f" Time offset (PCAP - CSV): {offset:.3f}s")
except:
pass
# Enhance rows
enhanced = []
matched = 0
for i, row in enumerate(rows):
ts_ns = parse_timestamp_to_ns(row['timestamp'])
if not ts_ns:
continue
try:
timestamp, nanos = parse_timestamp(row['timestamp'])
duration_ns = int(row['duration_ns'])
duration_ns = int(row.get('duration_ns', 0))
raw_bytes, packet_count = find_packets_in_window(
packets, timestamp, nanos, duration_ns
)
matching_packets = find_packets_in_window(packets, ts_ns, duration_ns)
useful_bytes = (
int(row['request_size_bytes']) +
int(row['response_size_bytes'])
)
overhead = raw_bytes - useful_bytes
efficiency = (
(useful_bytes / raw_bytes * 100)
if raw_bytes > 0 else 0
)
metrics = calculate_metrics(matching_packets)
row.update(metrics)
enhanced.append(row)
row['raw_bytes_total'] = raw_bytes
row['raw_packet_count'] = packet_count
row['overhead_bytes'] = overhead
row['efficiency_percent'] = f"{efficiency:.2f}"
if metrics['pcap_overhead_bytes'] > 0:
if raw_bytes > 0:
matched += 1
# Debug first few queries
if debug and i < 3:
print(f" Query {i}: {row['domain']}")
print(f" Start: {ts_ns} ns")
print(f" Duration: {duration_ns} ns ({duration_ns / 1e6:.3f}ms)")
print(f" End: {ts_ns + duration_ns} ns")
print(f" Matched packets: {len(matching_packets)}")
print(f" Bytes: {metrics['pcap_overhead_bytes']}")
print(f" Duration: {duration_ns / 1e6:.3f}ms")
print(f" Matched packets: {packet_count}")
print(f" Raw bytes: {raw_bytes}")
print(f" Useful bytes: {useful_bytes}")
print(f" Efficiency: {efficiency:.2f}%")
except (ValueError, KeyError) as e:
if debug:
print(f" Error processing row {i}: {e}")
row['raw_bytes_total'] = 0
row['raw_packet_count'] = 0
row['overhead_bytes'] = 0
row['efficiency_percent'] = "0.00"
enhanced.append(row)
print(f" Matched: {matched}/{len(rows)} queries")
@@ -204,16 +172,15 @@ def enhance_csv(csv_path, pcap_path, output_path, debug=False):
print(f" ✓ Saved: {output_path}")
return True
def main():
parser = argparse.ArgumentParser(
description='Add PCAP network metrics to DNS CSV files'
)
parser.add_argument('input_dir', help='Input directory (e.g., results_merged)')
parser.add_argument('input_dir', help='Input directory (e.g., results)')
parser.add_argument(
'--output',
default='./results_enhanced',
help='Output directory (default: ./results_enhanced)'
default='./results_enriched',
help='Output directory (default: ./results_enriched)'
)
parser.add_argument(
'--dry-run',
@@ -279,6 +246,5 @@ def main():
return 0 if failed == 0 else 1
if __name__ == "__main__":
exit(main())