Skip to content

Commit

Permalink
feat: support erspan3 in analyzer mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa committed Dec 4, 2024
1 parent 6246cd0 commit b9baf4b
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 35 deletions.
92 changes: 91 additions & 1 deletion agent/src/common/decapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ const LE_IPV6_PROTO_TYPE_I: u16 = 0xDD86; // 0x86dd's LittleEndian
const LE_ERSPAN_PROTO_TYPE_II: u16 = 0xBE88; // 0x88BE's LittleEndian
const LE_ERSPAN_PROTO_TYPE_III: u16 = 0xEB22; // 0x22EB's LittleEndian
const LE_VXLAN_PROTO_UDP_DPORT: u16 = 0xB512; // 0x12B5(4789)'s LittleEndian
const LE_GPE_VXLAN_PROTO_UDP_DPORT: u16 = 0xB612; // 0x12B6(4790)'s LittleEndian
const LE_VXLAN_PROTO_UDP_DPORT2: u16 = 0x1821; // 0x2118(8472)'s LittleEndian
const LE_VXLAN_PROTO_UDP_DPORT3: u16 = 0x801A; // 0x1A80(6784)'s LittleEndian
const LE_TRANSPARENT_ETHERNET_BRIDGEING: u16 = 0x5865; // 0x6558(25944)'s LittleEndian
Expand All @@ -180,6 +181,7 @@ pub struct TunnelInfo {
pub tunnel_type: TunnelType,
pub tier: u8,
pub is_ipv6: bool,
pub from: u32, // tunnel source ip
}

impl Default for TunnelInfo {
Expand All @@ -193,18 +195,30 @@ impl Default for TunnelInfo {
tunnel_type: TunnelType::default(),
tier: 0,
is_ipv6: false,
from: 0,
}
}
}

impl TunnelInfo {
pub fn reset_and_retain_erspan_from(&mut self) {
let from = if self.tunnel_type == TunnelType::Erspan {
self.from
} else {
0
};
*self = Default::default();
self.from = from;
}

fn decapsulate_addr(&mut self, l3_packet: &[u8]) {
self.src = Ipv4Addr::from(bytes::read_u32_be(
&l3_packet[FIELD_OFFSET_SIP - ETH_HEADER_SIZE..],
));
self.dst = Ipv4Addr::from(bytes::read_u32_be(
&l3_packet[FIELD_OFFSET_DIP - ETH_HEADER_SIZE..],
));
self.from = u32::from_be_bytes(self.src.octets());
}

fn decapsulate_mac(&mut self, packet: &[u8]) {
Expand All @@ -219,7 +233,7 @@ impl TunnelInfo {

pub fn decapsulate_udp(
&mut self,
packet: &[u8],
packet: &mut [u8],
l2_len: usize,
tunnel_types: &TunnelTypeBitmap,
) -> usize {
Expand All @@ -235,13 +249,89 @@ impl TunnelInfo {
{
self.decapsulate_vxlan(packet, l2_len)
}
LE_GPE_VXLAN_PROTO_UDP_DPORT if tunnel_types.has(TunnelType::Vxlan) => {
self.decapsulate_gpe_vxlan(packet, l2_len)
}
LE_GENEVE_PROTO_UDP_DPORT if tunnel_types.has(TunnelType::Geneve) => {
self.decapsulate_geneve(packet, l2_len)
}
_ => 0,
}
}

// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |R|R|Ver|I|P|B|O| Reserved |Next Protocol |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | VXLAN Network Identifier (VNI) | Reserved |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
pub fn decapsulate_gpe_vxlan(&mut self, packet: &mut [u8], l2_len: usize) -> usize {
let l3_packet = &mut packet[l2_len..];
if l3_packet.len() <= 28 {
return 0;
}
let l4_packet = &l3_packet[28..];
if l4_packet.len() <= 8 {
return 0;
}
let mut offset = 28;

// vxlan
let flags = l4_packet[0];
// version
if flags & 0x30 != 0 {
return 0;
}
// instance & protocol
if flags & 0x0c != 0x0c {
return 0;
}
// protocol
let protocol = l4_packet[3];
if protocol != 4 {
return 0;
}
// id
let id = bytes::read_u32_be(&l4_packet[4..]);
offset += 8;

// nsh
let vxlan_packet = &l4_packet[8..];
if vxlan_packet.len() <= 8 {
return 0;
}
let flags = bytes::read_u16_be(&vxlan_packet[0..]);
// version & o & c
if flags & 0xf000 != 0 {
return 0;
}
let length = (flags & 0x3f) << 2;
if vxlan_packet.len() <= length as usize {
return 0;
}
let protocol = vxlan_packet[3];
// only ipv4
if protocol != 1 {
return 0;
}
offset += length as usize;
offset -= 14;

// overlay eth
let macs = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x8, 0];
l3_packet[offset..offset + 14].copy_from_slice(&macs[..]);

if self.tier == 0 {
self.decapsulate_addr(l3_packet);
self.decapsulate_mac(packet);
self.tunnel_type = TunnelType::Vxlan;
self.id = id;
}

offset
}

pub fn decapsulate_vxlan(&mut self, packet: &[u8], l2_len: usize) -> usize {
let l3_packet = &packet[l2_len..];
if l3_packet.len() < FIELD_OFFSET_VXLAN_FLAGS + VXLAN_HEADER_SIZE {
Expand Down
5 changes: 3 additions & 2 deletions agent/src/common/tap_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ impl TapPort {
)
}

pub fn from_id(tunnel_type: TunnelType, id: u32) -> Self {
pub fn from_id(tunnel_type: TunnelType, id: u32, tunnel_from: u32) -> Self {
Self(
id as u64
((id as u64) & 0xff)
| ((tunnel_from as u64) & 0xffffff00)
| ((tunnel_type as u64 & Self::TUNNEL_TYPE_MASK) << Self::TUNNEL_TYPE_OFFSET)
| ((Self::FROM_ID as u64) << Self::FROM_OFFSET),
)
Expand Down
2 changes: 1 addition & 1 deletion agent/src/dispatcher/analyzer_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl AnalyzerModeDispatcher {
Some(vmac) => (true, u64::from(*vmac) as u32),
None => (false, 0),
};
let mut tap_port = TapPort::from_id(tunnel_info.tunnel_type, id as u32);
let mut tap_port = TapPort::from_id(tunnel_info.tunnel_type, id as u32, tunnel_info.from);
let is_unicast = tunnel_info.tier > 0 || !MacAddr::is_multicast(overlay_packet); // Consider unicast when there is a tunnel

if src_remote && dst_remote && is_unicast {
Expand Down
3 changes: 2 additions & 1 deletion agent/src/dispatcher/base_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ impl BaseDispatcher {
// vxlan-erspan:隧道信息为空
// erspan-vxlan;隧道信息为vxlan,隧道层数为1
// erspan-vxlan-erspan;隧道信息为空
*tunnel_info = Default::default();
tunnel_info.reset_and_retain_erspan_from();
}
decap_len += offset;
}
Expand Down Expand Up @@ -709,6 +709,7 @@ impl BaseDispatcherListener {
proxy_controller_port: self.proxy_controller_port,
analyzer_source_ip: source_ip.unwrap(),
analyzer_port: self.analyzer_port,
ignore_npb: options.tap_mode != TapMode::Analyzer,
};

let mut bpf_options = self.bpf_options.lock().unwrap();
Expand Down
54 changes: 32 additions & 22 deletions agent/src/dispatcher/recv_engine/bpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type JumpModifier = fn(jumpIf: JumpIf, index: usize, total: usize) -> JumpIf;
struct BpfBuilder {
ins: Vec<BpfSyntax>,
modifiers: Vec<Option<JumpModifier>>,
ignore_npb: bool,
}

#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -88,6 +89,7 @@ pub(crate) struct Builder {
pub proxy_controller_port: u16,
pub controller_tls_port: u16,
pub analyzer_source_ip: IpAddr,
pub ignore_npb: bool,
}

#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -725,7 +727,9 @@ impl Builder {
// 不采集和TSDB通信的流量
bpf_builder.appends(&mut self.skip_ipv4_tsdb());
// 不采集分发流量
bpf_builder.appends(&mut self.skip_ipv4_npb());
if self.ignore_npb {
bpf_builder.appends(&mut self.skip_ipv4_npb());
}

return bpf_builder.build();
}
Expand All @@ -736,7 +740,9 @@ impl Builder {
// 不采集和TSDB通信的流量
bpf_builder.appends(&mut self.skip_ipv6_tsdb());
// 不采集分发流量
bpf_builder.appends(&mut self.skip_ipv6_npb());
if self.ignore_npb {
bpf_builder.appends(&mut self.skip_ipv6_npb());
}

return bpf_builder.build();
}
Expand Down Expand Up @@ -779,26 +785,28 @@ impl Builder {
ip_version, self.analyzer_source_ip, self.analyzer_port
));

// 不采集分发的VXLAN流量
conditions.push(format!(
"not (udp and dst port {} and udp[8:1]={:#x})",
self.npb_port, self.vxlan_flags
));

// 不采集分发的TCP流量
conditions.push(format!("not (tcp and port {})", self.npb_port,));

// 不采集分发的ERSPANIII
conditions.push(format!(
"not (ip[9:1]={:#x} and ip[22:2]={:#x})",
u8::from(IpProtocol::GRE),
GRE_PROTO_ERSPAN_III
));
conditions.push(format!(
"not (ip6[6:1]={:#x} and ip6[42:2]={:#x})",
u8::from(IpProtocol::GRE),
GRE_PROTO_ERSPAN_III
));
if self.ignore_npb {
// 不采集分发的VXLAN流量
conditions.push(format!(
"not (udp and dst port {} and udp[8:1]={:#x})",
self.npb_port, self.vxlan_flags
));

// 不采集分发的TCP流量
conditions.push(format!("not (tcp and port {})", self.npb_port,));

// 不采集分发的ERSPANIII
conditions.push(format!(
"not (ip[9:1]={:#x} and ip[22:2]={:#x})",
u8::from(IpProtocol::GRE),
GRE_PROTO_ERSPAN_III
));
conditions.push(format!(
"not (ip6[6:1]={:#x} and ip6[42:2]={:#x})",
u8::from(IpProtocol::GRE),
GRE_PROTO_ERSPAN_III
));
}

conditions.join(" and ")
}
Expand All @@ -820,6 +828,7 @@ mod tests {
proxy_controller_port: 7788,
analyzer_port: 8899,
analyzer_source_ip: "1.2.3.4".parse::<IpAddr>().unwrap(),
ignore_npb: true,
};

let syntax = builder.build_pcap_syntax();
Expand Down Expand Up @@ -909,6 +918,7 @@ mod tests {
analyzer_source_ip: "9999:aaaa:bbbb:cccc:dddd:eeee:ffff:0000"
.parse::<IpAddr>()
.unwrap(),
ignore_npb: true,
};

let syntax = builder.build_pcap_syntax();
Expand Down
5 changes: 5 additions & 0 deletions agent/src/flow_generator/flow_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::{
use ahash::AHashMap;
use log::{debug, warn};
use lru::LruCache;
use pnet::packet::icmp::IcmpTypes;

use super::{
app_table::AppTable,
Expand Down Expand Up @@ -1315,6 +1316,10 @@ impl FlowMap {
node.tagged_flow.flow.reverse(true);
}
node.tagged_flow.flow.direction_score = direction_score;
} else if let ProtocolData::IcmpData(icmp_data) = &meta_packet.protocol_data {
if icmp_data.icmp_type == IcmpTypes::EchoReply.0 {
node.tagged_flow.flow.reverse(true);
}
}

/*
Expand Down
10 changes: 2 additions & 8 deletions agent/src/flow_generator/flow_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,19 +277,13 @@ impl FlowNode {
&& flow_key.port_src == meta_lookup_key.src_port
&& flow_key.port_dst == meta_lookup_key.dst_port
{
// l3 protocols, such as icmp, can determine the direction of packets according
// to icmp type, so there is no need to correct the direction of packets
if meta_lookup_key.is_tcp() || meta_lookup_key.is_udp() {
meta_packet.lookup_key.direction = PacketDirection::ClientToServer;
}
meta_packet.lookup_key.direction = PacketDirection::ClientToServer;
} else if flow_key.ip_src == meta_lookup_key.dst_ip
&& flow_key.ip_dst == meta_lookup_key.src_ip
&& flow_key.port_src == meta_lookup_key.dst_port
&& flow_key.port_dst == meta_lookup_key.src_port
{
if meta_lookup_key.is_tcp() || meta_lookup_key.is_udp() {
meta_packet.lookup_key.direction = PacketDirection::ServerToClient;
}
meta_packet.lookup_key.direction = PacketDirection::ServerToClient;
} else {
return false;
}
Expand Down
1 change: 1 addition & 0 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,7 @@ impl AgentComponents {
proxy_controller_port: candidate_config.dispatcher.proxy_controller_port,
analyzer_source_ip: source_ip,
analyzer_port: candidate_config.dispatcher.analyzer_port,
ignore_npb: candidate_config.tap_mode != TapMode::Analyzer,
};
let bpf_syntax_str = bpf_builder.build_pcap_syntax_to_str();
#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down

0 comments on commit b9baf4b

Please sign in to comment.