From c0b6d7e4282e7d502b69edc72f440dcd865193be Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 14 Jun 2021 20:52:26 +0300 Subject: [PATCH] Add AF_PACKET engine --- capture/af_packet.go | 180 +++++++++++++++++++++++++++++++++++++++++++ capture/capture.go | 40 +++++++++- 2 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 capture/af_packet.go diff --git a/capture/af_packet.go b/capture/af_packet.go new file mode 100644 index 00000000..55ade129 --- /dev/null +++ b/capture/af_packet.go @@ -0,0 +1,180 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "runtime/pprof" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/afpacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "golang.org/x/net/bpf" + + _ "github.com/google/gopacket/layers" +) + +var ( + iface = flag.String("i", "any", "Interface to read from") + cpuprofile = flag.String("cpuprofile", "", "If non-empty, write CPU profile here") + snaplen = flag.Int("s", 0, "Snaplen, if <= 0, use 65535") + bufferSize = flag.Int("b", 8, "Interface buffersize (MB)") + filter = flag.String("f", "port not 22", "BPF filter") + count = flag.Int64("c", -1, "If >= 0, # of packets to capture before returning") + verbose = flag.Int64("log_every", 1, "Write a log every X packets") + addVLAN = flag.Bool("add_vlan", false, "If true, add VLAN header") +) + +type afpacketHandle struct { + TPacket *afpacket.TPacket +} + +func newAfpacketHandle(device string, snaplen int, block_size int, num_blocks int, + useVLAN bool, timeout time.Duration) (*afpacketHandle, error) { + + h := &afpacketHandle{} + var err error + + if device == "any" { + h.TPacket, err = afpacket.NewTPacket( + afpacket.OptFrameSize(snaplen), + afpacket.OptBlockSize(block_size), + afpacket.OptNumBlocks(num_blocks), + afpacket.OptAddVLANHeader(useVLAN), + afpacket.OptPollTimeout(timeout), + afpacket.SocketRaw, + afpacket.TPacketVersion3) + } else { + h.TPacket, err = afpacket.NewTPacket( + afpacket.OptInterface(device), + afpacket.OptFrameSize(snaplen), + afpacket.OptBlockSize(block_size), + afpacket.OptNumBlocks(num_blocks), + afpacket.OptAddVLANHeader(useVLAN), + afpacket.OptPollTimeout(timeout), + afpacket.SocketRaw, + afpacket.TPacketVersion3) + } + return h, err +} + +// ZeroCopyReadPacketData satisfies ZeroCopyPacketDataSource interface +func (h *afpacketHandle) ZeroCopyReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) { + return h.TPacket.ZeroCopyReadPacketData() +} + +// SetBPFFilter translates a BPF filter string into BPF RawInstruction and applies them. +func (h *afpacketHandle) SetBPFFilter(filter string, snaplen int) (err error) { + pcapBPF, err := pcap.CompileBPFFilter(layers.LinkTypeEthernet, snaplen, filter) + if err != nil { + return err + } + bpfIns := []bpf.RawInstruction{} + for _, ins := range pcapBPF { + bpfIns2 := bpf.RawInstruction{ + Op: ins.Code, + Jt: ins.Jt, + Jf: ins.Jf, + K: ins.K, + } + bpfIns = append(bpfIns, bpfIns2) + } + if h.TPacket.SetBPF(bpfIns); err != nil { + return err + } + return h.TPacket.SetBPF(bpfIns) +} + +// LinkType returns ethernet link type. +func (h *afpacketHandle) LinkType() layers.LinkType { + return layers.LinkTypeEthernet +} + +// Close will close afpacket source. +func (h *afpacketHandle) Close() { + h.TPacket.Close() +} + +// SocketStats prints received, dropped, queue-freeze packet stats. +func (h *afpacketHandle) SocketStats() (as afpacket.SocketStats, asv afpacket.SocketStatsV3, err error) { + return h.TPacket.SocketStats() +} + +// afpacketComputeSize computes the block_size and the num_blocks in such a way that the +// allocated mmap buffer is close to but smaller than target_size_mb. +// The restriction is that the block_size must be divisible by both the +// frame size and page size. +func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) ( + frameSize int, blockSize int, numBlocks int, err error) { + + if snaplen < pageSize { + frameSize = pageSize / (pageSize / snaplen) + } else { + frameSize = (snaplen/pageSize + 1) * pageSize + } + + // 128 is the default from the gopacket library so just use that + blockSize = frameSize * 128 + numBlocks = (targetSizeMb * 1024 * 1024) / blockSize + + if numBlocks == 0 { + return 0, 0, 0, fmt.Errorf("Interface buffersize is too small") + } + + return frameSize, blockSize, numBlocks, nil +} + +func main() { + flag.Parse() + if *cpuprofile != "" { + log.Printf("Writing CPU profile to %q", *cpuprofile) + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal(err) + } + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatal(err) + } + defer pprof.StopCPUProfile() + } + log.Printf("Starting on interface %q", *iface) + if *snaplen <= 0 { + *snaplen = 65535 + } + szFrame, szBlock, numBlocks, err := afpacketComputeSize(*bufferSize, *snaplen, os.Getpagesize()) + if err != nil { + log.Fatal(err) + } + afpacketHandle, err := newAfpacketHandle(*iface, szFrame, szBlock, numBlocks, *addVLAN, pcap.BlockForever) + if err != nil { + log.Fatal(err) + } + err = afpacketHandle.SetBPFFilter(*filter, *snaplen) + if err != nil { + log.Fatal(err) + } + source := gopacket.ZeroCopyPacketDataSource(afpacketHandle) + defer afpacketHandle.Close() + + bytes := uint64(0) + packets := uint64(0) + for ; *count != 0; *count-- { + data, _, err := source.ZeroCopyReadPacketData() + if err != nil { + log.Fatal(err) + } + bytes += uint64(len(data)) + packets++ + if *count%*verbose == 0 { + _, afpacketStats, err := afpacketHandle.SocketStats() + if err != nil { + log.Println(err) + } + log.Printf("Read in %d bytes in %d packets", bytes, packets) + log.Printf("Stats {received dropped queue-freeze}: %d", afpacketStats) + } + } +} diff --git a/capture/capture.go b/capture/capture.go index f8a885a7..808e7ba4 100644 --- a/capture/capture.go +++ b/capture/capture.go @@ -65,6 +65,7 @@ const ( EnginePcap EngineType = 1 << iota EnginePcapFile EngineRawSocket + EngineAFPacket ) // Set is here so that EngineType can implement flag.Var @@ -74,8 +75,10 @@ func (eng *EngineType) Set(v string) error { *eng = EnginePcap case "pcap_file": *eng = EnginePcapFile - case "raw_socket", "af_packet": + case "raw_socket": *eng = EngineRawSocket + case "af_packet": + *eng = EngineAFPacket default: return fmt.Errorf("invalid engine %s", v) } @@ -90,6 +93,8 @@ func (eng *EngineType) String() (e string) { e = "libpcap" case EngineRawSocket: e = "raw_socket" + case EngineAFPacket: + e = "af_packet" default: e = "" } @@ -124,6 +129,9 @@ func NewListener(host string, ports []uint16, transport string, engine EngineTyp case EngineRawSocket: l.Engine = EngineRawSocket l.Activate = l.activateRawSocket + case EngineAFPacket: + l.Engine = EngineAFPacket + l.Activate = l.activateAFPacket case EnginePcapFile: l.Engine = EnginePcapFile l.Activate = l.activatePcapFile @@ -403,6 +411,36 @@ func (l *Listener) activatePcap() error { return nil } +func (l *Listener) activateAFPacket() error { + var e error + var msg string + + szFrame, szBlock, numBlocks, err := afpacketComputeSize(*bufferSize, *snaplen, os.Getpagesize()) + if l.host == "" || l.host == "any" { + handle, err := newAfpacketHandle("any", szFrame, szBlock, numBlocks, false, pcap.BlockForever) + if err != nil { + return err + } + return nil + } + + for _, ifi := range l.Interfaces { + var handle *pcap.Handle + + handle, err := newAfpacketHandle(ifi.Name, szFrame, szBlock, numBlocks, false, pcap.BlockForever) + + if err != nil { + msg += ("\n" + err.Error()) + continue + } + l.Handles[ifi.Name] = handle + } + if len(l.Handles) == 0 { + return fmt.Errorf("pcap handles error:%s", msg) + } + return nil +} + func (l *Listener) activateRawSocket() error { if runtime.GOOS != "linux" { return fmt.Errorf("sock_raw is not stabilized on OS other than linux")