From 530be2744019ffe0872c66af348e45e8d14f4529 Mon Sep 17 00:00:00 2001 From: afranco Date: Mon, 13 Oct 2025 00:10:15 +0100 Subject: [PATCH] fix(scripts): change add metrics to csv --- scripts/tools/add_extra_metrics_to_csv.py | 254 ++++++++++------------ 1 file changed, 110 insertions(+), 144 deletions(-) diff --git a/scripts/tools/add_extra_metrics_to_csv.py b/scripts/tools/add_extra_metrics_to_csv.py index d7884e8..f537e66 100644 --- a/scripts/tools/add_extra_metrics_to_csv.py +++ b/scripts/tools/add_extra_metrics_to_csv.py @@ -1,116 +1,68 @@ #!/usr/bin/env python3 """ Add network metrics from PCAP files to DNS CSV files. -Adds: pcap_network_bytes_in, pcap_network_bytes_out, pcap_overhead_bytes +Adds: raw_bytes_total, raw_packet_count, overhead_bytes, efficiency_percent """ import csv import os import argparse +import re from pathlib import Path from datetime import datetime, timezone -import dpkt -import socket - -# Test machine IPs -TEST_IPS = { - '10.0.0.50', - '2001:818:e73e:ba00:5506:dfd4:ed8b:96e', - 'fe80::fe98:c62e:4463:9a2d' -} - - -def inet_to_str(inet): - """Convert inet bytes to IP string""" - try: - return socket.inet_ntop(socket.AF_INET, inet) - except ValueError: - try: - return socket.inet_ntop(socket.AF_INET6, inet) - except ValueError: - return None +from scapy.all import rdpcap +def parse_timestamp(ts_str): + """Parse timestamp with timezone and nanoseconds (RFC3339Nano).""" + match = re.match( + r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})\.(\d+)([\+\-]\d{2}:\d{2})', + ts_str + ) + + if not match: + raise ValueError(f"Invalid timestamp format: {ts_str}") + + base, nanos, tz = match.groups() + micros = nanos[:6].ljust(6, '0') + iso_str = f"{base}.{micros}{tz}" + dt = datetime.fromisoformat(iso_str) + full_nanos = int(nanos.ljust(9, '0')) + + return dt, full_nanos def read_pcap(pcap_path): - """Read PCAP and return list of (timestamp_ns, size, src_ip, dst_ip)""" + """Read PCAP and return list of (timestamp_epoch, size).""" packets = [] - - with open(pcap_path, 'rb') as f: - try: - pcap = dpkt.pcap.Reader(f) - except: - f.seek(0) - pcap = dpkt.pcapng.Reader(f) - - for ts, buf in pcap: - try: - # Convert PCAP timestamp (float seconds) to nanoseconds - timestamp_ns = int(ts * 1_000_000_000) - size = len(buf) - eth = dpkt.ethernet.Ethernet(buf) - - src_ip = dst_ip = None - - if isinstance(eth.data, dpkt.ip.IP): - src_ip = inet_to_str(eth.data.src) - dst_ip = inet_to_str(eth.data.dst) - elif isinstance(eth.data, dpkt.ip6.IP6): - src_ip = inet_to_str(eth.data.src) - dst_ip = inet_to_str(eth.data.dst) - - if src_ip and dst_ip: - packets.append((timestamp_ns, size, src_ip, dst_ip)) - - except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError): - continue + try: + pkts = rdpcap(str(pcap_path)) + for pkt in pkts: + timestamp = float(pkt.time) + length = len(pkt) + packets.append((timestamp, length)) + except Exception as e: + print(f" ❌ Error reading PCAP: {e}") + return [] return packets - -def find_packets_in_window(packets, start_ns, duration_ns): - """Find packets within exact time window (nanosecond precision)""" - end_ns = start_ns + duration_ns +def find_packets_in_window(packets, start_ts, start_nanos, duration_ns): + """Find packets within exact time window.""" + start_epoch = start_ts.timestamp() + start_epoch += (start_nanos % 1_000_000) / 1_000_000_000 + end_epoch = start_epoch + (duration_ns / 1_000_000_000) - matching = [] - for timestamp_ns, size, src_ip, dst_ip in packets: - if start_ns <= timestamp_ns <= end_ns: - matching.append((size, src_ip, dst_ip)) + total_bytes = 0 + packet_count = 0 - return matching - - -def calculate_metrics(packets): - """Calculate network metrics from packets""" - bytes_in = 0 - bytes_out = 0 + for pkt_ts, pkt_len in packets: + if start_epoch <= pkt_ts <= end_epoch: + total_bytes += pkt_len + packet_count += 1 - for size, src_ip, dst_ip in packets: - if dst_ip in TEST_IPS: - bytes_in += size - elif src_ip in TEST_IPS: - bytes_out += size - - return { - 'pcap_network_bytes_in': bytes_in, - 'pcap_network_bytes_out': bytes_out, - 'pcap_overhead_bytes': bytes_in + bytes_out - } - - -def parse_timestamp_to_ns(ts_str): - """Parse ISO timestamp to nanoseconds since epoch""" - try: - dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) - if dt.tzinfo is not None: - dt = dt.astimezone(timezone.utc) - # Convert to nanoseconds since epoch - return int(dt.timestamp() * 1_000_000_000) - except ValueError: - return None - + return total_bytes, packet_count def enhance_csv(csv_path, pcap_path, output_path, debug=False): - """Add PCAP metrics to CSV""" + """Add PCAP metrics to CSV.""" if not os.path.exists(pcap_path): print(f"⚠️ PCAP not found: {pcap_path}") return False @@ -118,75 +70,91 @@ def enhance_csv(csv_path, pcap_path, output_path, debug=False): print(f"Processing: {os.path.basename(csv_path)}") # Read PCAP - try: - packets = read_pcap(pcap_path) - print(f" Loaded {len(packets)} packets") - - if packets and debug: - first_pcap_ns = packets[0][0] - last_pcap_ns = packets[-1][0] - print(f" First PCAP packet: {first_pcap_ns} ns") - print(f" Last PCAP packet: {last_pcap_ns} ns") - print(f" PCAP duration: {(last_pcap_ns - first_pcap_ns) / 1e9:.3f}s") - - except Exception as e: - print(f" ❌ Error reading PCAP: {e}") - return False + packets = read_pcap(pcap_path) + print(f" Loaded {len(packets)} packets") if not packets: print(" ❌ No packets found") return False + if packets and debug: + first_pcap = packets[0][0] + last_pcap = packets[-1][0] + print(f" First PCAP packet: {first_pcap:.6f}") + print(f" Last PCAP packet: {last_pcap:.6f}") + print(f" PCAP duration: {(last_pcap - first_pcap):.3f}s") + # Read CSV with open(csv_path, 'r', newline='') as f: reader = csv.DictReader(f) fieldnames = list(reader.fieldnames) + [ - 'pcap_network_bytes_in', - 'pcap_network_bytes_out', - 'pcap_overhead_bytes' + 'raw_bytes_total', + 'raw_packet_count', + 'overhead_bytes', + 'efficiency_percent' ] rows = list(reader) if rows and debug: - first_csv_ns = parse_timestamp_to_ns(rows[0]['timestamp']) - last_csv_ns = parse_timestamp_to_ns(rows[-1]['timestamp']) - if first_csv_ns and last_csv_ns: - print(f" First CSV query: {first_csv_ns} ns") - print(f" Last CSV query: {last_csv_ns} ns") - print(f" CSV duration: {(last_csv_ns - first_csv_ns) / 1e9:.3f}s") - - # Check alignment - offset_ns = packets[0][0] - first_csv_ns - print(f" Time offset (PCAP - CSV): {offset_ns / 1e9:.3f}s") + try: + first_ts, _ = parse_timestamp(rows[0]['timestamp']) + last_ts, _ = parse_timestamp(rows[-1]['timestamp']) + print(f" First CSV query: {first_ts.timestamp():.6f}") + print(f" Last CSV query: {last_ts.timestamp():.6f}") + offset = packets[0][0] - first_ts.timestamp() + print(f" Time offset (PCAP - CSV): {offset:.3f}s") + except: + pass # Enhance rows enhanced = [] matched = 0 for i, row in enumerate(rows): - ts_ns = parse_timestamp_to_ns(row['timestamp']) - if not ts_ns: - continue + try: + timestamp, nanos = parse_timestamp(row['timestamp']) + duration_ns = int(row['duration_ns']) + + raw_bytes, packet_count = find_packets_in_window( + packets, timestamp, nanos, duration_ns + ) + + useful_bytes = ( + int(row['request_size_bytes']) + + int(row['response_size_bytes']) + ) + overhead = raw_bytes - useful_bytes + efficiency = ( + (useful_bytes / raw_bytes * 100) + if raw_bytes > 0 else 0 + ) + + row['raw_bytes_total'] = raw_bytes + row['raw_packet_count'] = packet_count + row['overhead_bytes'] = overhead + row['efficiency_percent'] = f"{efficiency:.2f}" + + if raw_bytes > 0: + matched += 1 + + # Debug first few queries + if debug and i < 3: + print(f" Query {i}: {row['domain']}") + print(f" Duration: {duration_ns / 1e6:.3f}ms") + print(f" Matched packets: {packet_count}") + print(f" Raw bytes: {raw_bytes}") + print(f" Useful bytes: {useful_bytes}") + print(f" Efficiency: {efficiency:.2f}%") + + except (ValueError, KeyError) as e: + if debug: + print(f" Error processing row {i}: {e}") + row['raw_bytes_total'] = 0 + row['raw_packet_count'] = 0 + row['overhead_bytes'] = 0 + row['efficiency_percent'] = "0.00" - duration_ns = int(row.get('duration_ns', 0)) - - matching_packets = find_packets_in_window(packets, ts_ns, duration_ns) - - metrics = calculate_metrics(matching_packets) - row.update(metrics) enhanced.append(row) - - if metrics['pcap_overhead_bytes'] > 0: - matched += 1 - - # Debug first few queries - if debug and i < 3: - print(f" Query {i}: {row['domain']}") - print(f" Start: {ts_ns} ns") - print(f" Duration: {duration_ns} ns ({duration_ns / 1e6:.3f}ms)") - print(f" End: {ts_ns + duration_ns} ns") - print(f" Matched packets: {len(matching_packets)}") - print(f" Bytes: {metrics['pcap_overhead_bytes']}") print(f" Matched: {matched}/{len(rows)} queries") @@ -204,16 +172,15 @@ def enhance_csv(csv_path, pcap_path, output_path, debug=False): print(f" ✓ Saved: {output_path}") return True - def main(): parser = argparse.ArgumentParser( description='Add PCAP network metrics to DNS CSV files' ) - parser.add_argument('input_dir', help='Input directory (e.g., results_merged)') + parser.add_argument('input_dir', help='Input directory (e.g., results)') parser.add_argument( '--output', - default='./results_enhanced', - help='Output directory (default: ./results_enhanced)' + default='./results_enriched', + help='Output directory (default: ./results_enriched)' ) parser.add_argument( '--dry-run', @@ -279,6 +246,5 @@ def main(): return 0 if failed == 0 else 1 - if __name__ == "__main__": exit(main())