کپچر کردن ترافیک شبکه با tcpdump و تجزیه و تحلیل آن با tshark

از پاپیروس
پرش به ناوبریپرش به جستجو

این دستور ۳ فایل می‌سازد که هر یک به مدت ۶۰ ثانیه ترافیک را نگه می‌دارد.

tcpdump -i eth0 -s 0 -w 'DUMP-%Y%m%d%H%M%S.pcap' -G 60 -W 3

تجزیه و تحلیل یک فایل dump

  • ۱
tshark -r dump.pcap -n   -T fields -E separator=, -E quote=n -E header=n   -e frame.cap_len -e ip.src -e ip.dst -e ipv6.src -e ipv6.dst -e eth.src -e eth.dst | awk -F',' 'BEGIN{OFS=","}
  {
    cap = $1 + 0; 
    src = ($2 != "" ? $2 : ($4 != "" ? $4 : ($6 != "" ? $6 : "UNKNOWN")));
    dst = ($3 != "" ? $3 : ($5 != "" ? $5 : ($7 != "" ? $7 : "UNKNOWN")));
    k = src "," dst;
    bytes[k] += cap;
    total += cap;
  } 
  END{
    print "src,dst,bytes";
    for (k in bytes) print k "," bytes[k];
  }' | sort -t, -k3,3nr > result.csv
  • ۲
tshark -r dump.pcap -T fields -e _ws.col.Protocol | sort | uniq -c | sort -nr
  • ۳
tshark -r dump.pcap -q -z conv,ip
  • ۴
tshark -r dump.pcap -q -z io,stat,0
  • ۵ بدون استفاده از tshark. استفاده از python

محتوای فایل pcap_proto_summary.py:

#!/usr/bin/env python3
import sys
import struct
from collections import defaultdict, Counter

def read_global_header(f):
    gh = f.read(24)
    if len(gh) < 24:
        raise ValueError("Not a valid pcap (global header too short)")
    magic_be = struct.unpack(">I", gh[0:4])[0]
    magic_le = struct.unpack("<I", gh[0:4])[0]
    if magic_be in (0xa1b2c3d4, 0xa1b23c4d):
        endian = ">"
    elif magic_le in (0xa1b2c3d4, 0xa1b23c4d, 0xd4c3b2a1, 0x4d3cb2a1):
        endian = "<"
    else:
        endian = "<"
    _, _, _, _, snaplen, network = struct.unpack(endian + "HHiiii", gh[4:24])
    return endian, snaplen, network

ETHERTYPE_IPv4 = 0x0800
ETHERTYPE_ARP  = 0x0806
ETHERTYPE_IPv6 = 0x86DD

L4_NAMES = {1:"ICMP", 6:"TCP", 17:"UDP", 58:"ICMPv6"}

def parse_pcap(path):
    totals = {"packets": 0, "bytes_incl": 0}
    l2_counts = Counter(); l2_bytes = defaultdict(int)
    l4_counts = Counter(); l4_bytes = defaultdict(int)

    with open(path, "rb") as f:
        endian, snaplen, linktype = read_global_header(f)

        while True:
            hdr = f.read(16)
            if len(hdr) < 16: break
            ts_sec, ts_usec, incl_len, orig_len = struct.unpack(endian + "IIII", hdr)
            pkt = f.read(incl_len)
            if len(pkt) < incl_len: break

            totals["packets"] += 1
            totals["bytes_incl"] += incl_len

            if linktype == 1:  # Ethernet
                if len(pkt) < 14:
                    l2_counts["Truncated"] += 1; l2_bytes["Truncated"] += incl_len; continue
                ethertype = (pkt[12] << 8) | pkt[13]
                if ethertype == ETHERTYPE_ARP:
                    l2_counts["ARP"] += 1; l2_bytes["ARP"] += incl_len
                elif ethertype == ETHERTYPE_IPv4:
                    l2_counts["IPv4"] += 1; l2_bytes["IPv4"] += incl_len
                    ip_off = 14
                    if len(pkt) >= ip_off + 20 and (pkt[ip_off] >> 4) == 4:
                        ihl = (pkt[ip_off] & 0x0F) * 4
                        if len(pkt) >= ip_off + ihl:
                            total_len = (pkt[ip_off+2] << 8) | pkt[ip_off+3]
                            proto = pkt[ip_off+9]
                            name = L4_NAMES.get(proto, f"IP proto {proto}")
                            l4_counts[name] += 1
                            l4_bytes[name] += min(total_len + 14, incl_len)
                elif ethertype == ETHERTYPE_IPv6:
                    l2_counts["IPv6"] += 1; l2_bytes["IPv6"] += incl_len
                    ip6_off = 14
                    if len(pkt) >= ip6_off + 40 and (pkt[ip6_off] >> 4) == 6:
                        payload_len = (pkt[ip6_off+4] << 8) | pkt[ip6_off+5]
                        next_header = pkt[ip6_off+6]
                        name = L4_NAMES.get(next_header, f"IP proto {next_header}")
                        l4_counts[name] += 1
                        l4_bytes[name] += min(40 + payload_len + 14, incl_len)
                else:
                    key = f"EtherType 0x{ethertype:04x}"
                    l2_counts[key] += 1; l2_bytes[key] += incl_len
            else:
                key = f"LinkType {linktype}"
                l2_counts[key] += 1; l2_bytes[key] += incl_len

    return totals, l2_counts, l2_bytes, l4_counts, l4_bytes

def human_bytes(n):
    for unit in ["B","KB","MB","GB"]:
        if n < 1024.0 or unit == "GB":
            return f"{n:.2f} {unit}"
        n /= 1024.0

def main():
    if len(sys.argv) < 2:
        print("Usage: pcap_proto_summary.py <file.pcap>")
        sys.exit(1)
    path = sys.argv[1]
    totals, l2_counts, l2_bytes, l4_counts, l4_bytes = parse_pcap(path)

    print("=== Totals ===")
    print(f"Packets: {totals['packets']}")
    print(f"Bytes (captured): {totals['bytes_incl']} ({human_bytes(totals['bytes_incl'])})")
    print()
    print("=== L3 / L2 breakdown ===")
    for k, v in l2_counts.most_common():
        print(f"{k:20s}  pkts={v:6d}  bytes={l2_bytes[k]} ({human_bytes(l2_bytes[k])})")
    print()
    print("=== L4 breakdown (inside IPv4/IPv6) ===")
    for k, v in l4_counts.most_common():
        print(f"{k:20s}  pkts={v:6d}  bytes~={l4_bytes[k]} ({human_bytes(l4_bytes[k])})")

if __name__ == "__main__":
    main()

طرز استفاده:

python3 pcap_proto_summary.py dump.pcap