| Crates.io | netflow_parser |
| lib.rs | netflow_parser |
| version | 0.8.3 |
| created_at | 2023-08-28 03:55:12.817818+00 |
| updated_at | 2026-01-05 22:04:36.920914+00 |
| description | Parser for Netflow Cisco V5, V7, V9, IPFIX |
| homepage | |
| repository | https://github.com/mikemiles-dev/netflow_parser/ |
| max_upload_size | |
| id | 956573 |
| size | 9,817,456 |
A Netflow Parser library for Cisco V5, V7, V9, and IPFIX written in Rust. Supports chaining of multiple versions in the same stream.
⚠️ Multi-Router Deployments: Use
AutoScopedParserinstead ofNetflowParserwhen parsing from multiple routers to prevent template cache collisions. See Template Management Guide for details.
use netflow_parser::{NetflowParser, NetflowPacket};
// 0000 00 05 00 01 03 00 04 00 05 00 06 07 08 09 00 01 ................
// 0010 02 03 04 05 06 07 08 09 00 01 02 03 04 05 06 07 ................
// 0020 08 09 00 01 02 03 04 05 06 07 08 09 00 01 02 03 ................
// 0030 04 05 06 07 08 09 00 01 02 03 04 05 06 07 08 09 ................
// 0040 00 01 02 03 04 05 06 07 ........
let v5_packet = [0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,];
let result = NetflowParser::default().parse_bytes(&v5_packet);
match result.packets.first() {
Some(NetflowPacket::V5(v5)) => assert_eq!(v5.header.version, 5),
_ => (),
}
// Check for errors
if let Some(e) = result.error {
eprintln!("Parse error: {}", e);
}
Structures fully support serialization. Below is an example using the serde_json macro:
use serde_json::json;
use netflow_parser::NetflowParser;
// 0000 00 05 00 01 03 00 04 00 05 00 06 07 08 09 00 01 ................
// 0010 02 03 04 05 06 07 08 09 00 01 02 03 04 05 06 07 ................
// 0020 08 09 00 01 02 03 04 05 06 07 08 09 00 01 02 03 ................
// 0030 04 05 06 07 08 09 00 01 02 03 04 05 06 07 08 09 ................
// 0040 00 01 02 03 04 05 06 07 ........
let v5_packet = [0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,];
let result = NetflowParser::default().parse_bytes(&v5_packet);
println!("{}", json!(result.packets).to_string());
[
{
"V5": {
"header": {
"count": 1,
"engine_id": 7,
"engine_type": 6,
"flow_sequence": 33752069,
"sampling_interval": 2057,
"sys_up_time": { "nanos": 672000000, "secs": 50332 },
"unix_nsecs": 134807553,
"unix_secs": 83887623,
"version": 5
},
"sets": [
{
"d_octets": 66051,
"d_pkts": 101124105,
"dst_addr": "4.5.6.7",
"dst_as": 515,
"dst_mask": 5,
"dst_port": 1029,
"first": { "nanos": 87000000, "secs": 67438 },
"input": 515,
"last": { "nanos": 553000000, "secs": 134807 },
"next_hop": "8.9.0.1",
"output": 1029,
"pad1": 6,
"pad2": 1543,
"protocol_number": 8,
"protocol_type": "Egp",
"src_addr": "0.1.2.3",
"src_as": 1,
"src_mask": 4,
"src_port": 515,
"tcp_flags": 7,
"tos": 9
}
]
}
}
]
use netflow_parser::{NetflowParser, NetflowPacket};
// 0000 00 05 00 01 03 00 04 00 05 00 06 07 08 09 00 01 ................
// 0010 02 03 04 05 06 07 08 09 00 01 02 03 04 05 06 07 ................
// 0020 08 09 00 01 02 03 04 05 06 07 08 09 00 01 02 03 ................
// 0030 04 05 06 07 08 09 00 01 02 03 04 05 06 07 08 09 ................
// 0040 00 01 02 03 04 05 06 07 ........
let v5_packet = [0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,];
let result = NetflowParser::default().parse_bytes(&v5_packet);
let v5_parsed: Vec<NetflowPacket> = result.packets.into_iter().filter(|p| p.is_v5()).collect();
You can use the iterator API to process packets one-by-one as they're parsed instead of returning Vec:
use netflow_parser::{NetflowParser, NetflowPacket};
let buffer = /* your netflow data */;
let mut parser = NetflowParser::default();
// Process packets without collecting into a Vec
for result in parser.iter_packets(&buffer) {
match result {
Ok(NetflowPacket::V5(v5)) => {
// Process V5 packet
println!("V5 packet from {}", v5.header.version);
}
Ok(NetflowPacket::V9(v9)) => {
// Process V9 packet
for flowset in &v9.flowsets {
// Handle flowsets
}
}
Ok(NetflowPacket::IPFix(ipfix)) => {
// Process IPFIX packet
}
Err(e) => {
eprintln!("Parse error: {:?}", e);
}
_ => {}
}
}
The iterator provides access to unconsumed bytes for advanced use cases:
use netflow_parser::NetflowParser;
let buffer = /* your netflow data */;
let mut parser = NetflowParser::default();
let mut iter = parser.iter_packets(&buffer);
while let Some(packet) = iter.next() {
// Process packet
}
// Check if all bytes were consumed
if !iter.is_complete() {
println!("Warning: {} bytes remain unconsumed", iter.remaining().len());
}
Vec.filter(), .map(), .take(), etc.).remaining() and check completion with .is_complete()// Count V5 packets without collecting
let count = parser.iter_packets(&buffer)
.filter(|r| r.as_ref().map(|p| p.is_v5()).unwrap_or(false))
.count();
// Process only the first 10 packets
for result in parser.iter_packets(&buffer).take(10) {
if let Ok(packet) = result {
// Handle packet
}
}
// Collect only if needed (equivalent to parse_bytes())
let packets: Vec<_> = parser.iter_packets(&buffer)
.filter_map(Result::ok)
.collect();
// Check unconsumed bytes (useful for mixed protocol streams)
let mut iter = parser.iter_packets(&buffer);
for result in &mut iter {
if let Ok(packet) = result {
// Process packet
}
}
if !iter.is_complete() {
let remaining = iter.remaining();
// Handle non-netflow data at end of buffer
}
The NetflowParser can be configured using the builder pattern to customize behavior for your specific use case.
use netflow_parser::NetflowParser;
// Create parser with default settings
let parser = NetflowParser::default();
// Or use the builder for custom configuration
let parser = NetflowParser::builder()
.build()
.expect("Failed to build parser");
V9 and IPFIX parsers use LRU (Least Recently Used) caching to store templates. Configure the cache size to prevent memory exhaustion while maintaining good performance:
use netflow_parser::NetflowParser;
// Configure both V9 and IPFIX parsers with the same cache size
let parser = NetflowParser::builder()
.with_cache_size(2000) // Default is 1000
.build()
.expect("Failed to build parser");
// Configure V9 and IPFIX independently
let parser = NetflowParser::builder()
.with_v9_cache_size(1000)
.with_ipfix_cache_size(5000)
.build()
.expect("Failed to build parser");
Cache Behavior:
RouterScopedParser (see Template Management section)Configure the maximum number of fields allowed per template to prevent DoS attacks via malicious packets with excessive field counts:
use netflow_parser::NetflowParser;
// Configure both V9 and IPFIX parsers with the same limit
let parser = NetflowParser::builder()
.with_max_field_count(5000) // Default is 10,000
.build()
.expect("Failed to build parser");
// Configure V9 and IPFIX independently
let parser = NetflowParser::builder()
.with_v9_max_field_count(5000)
.with_ipfix_max_field_count(15000)
.build()
.expect("Failed to build parser");
Security Considerations:
Additional Security Validations: The parser also automatically validates:
Config::max_template_total_sizefield_type_number values(field_type_number, enterprise_number) pairs⚠️ Breaking Change in v0.7.0: Packet-based and combined TTL modes have been removed. Only time-based TTL is now supported. See RELEASES.md for migration guide.
Optionally configure templates to expire after a time duration. This is useful for:
Note: TTL is disabled by default. Templates persist until LRU eviction unless explicitly configured.
use netflow_parser::NetflowParser;
use netflow_parser::variable_versions::ttl::TtlConfig;
use std::time::Duration;
// Templates expire after 2 hours
let parser = NetflowParser::builder()
.with_cache_size(1000)
.with_ttl(TtlConfig::new(Duration::from_secs(2 * 3600)))
.build()
.unwrap();
// Using default TTL (2 hours)
let parser = NetflowParser::builder()
.with_cache_size(1000)
.with_ttl(TtlConfig::default())
.build()
.unwrap();
// Different TTL for V9 and IPFIX
let parser = NetflowParser::builder()
.with_v9_ttl(TtlConfig::new(Duration::from_secs(3600)))
.with_ipfix_ttl(TtlConfig::new(Duration::from_secs(2 * 3600)))
.build()
.unwrap();
If you only care about specific NetFlow versions, configure allowed versions:
use netflow_parser::NetflowParser;
// Only parse V5 and V9 packets
let parser = NetflowParser::builder()
.with_allowed_versions([5, 9].into())
.build()
.expect("Failed to build parser");
// Or set directly on an existing parser
let mut parser = NetflowParser::default();
parser.allowed_versions = [7, 9].into();
Packets with versions not in the allowed list will be ignored (returns empty Vec).
parse_bytes() returns ParseResult to preserve partially parsed packets when errors occur mid-stream:
use netflow_parser::{NetflowParser, ParseResult};
let result = parser.parse_bytes(&buffer);
// Always get successfully parsed packets, even if an error occurred later
for packet in result.packets {
// Process packet
}
// Check for errors
if let Some(e) = result.error {
eprintln!("Error after {} packets: {}", result.packets.len(), e);
}
iter_packets() yields Result<NetflowPacket, NetflowError> for per-packet error handling:
// Per-packet error handling
for result in parser.iter_packets(&buffer) {
match result {
Ok(packet) => { /* process */ }
Err(e) => eprintln!("Error: {}", e),
}
}
Error types: Incomplete, UnsupportedVersion, Partial, MissingTemplate, ParseError. All implement Display and std::error::Error.
To prevent memory exhaustion from malformed packets, the parser limits the size of error buffer samples. By default, only the first 256 bytes of unparseable data are stored in error messages:
use netflow_parser::NetflowParser;
// Recommended: Use builder pattern (automatically configures all parsers)
let parser = NetflowParser::builder()
.with_max_error_sample_size(512) // Default is 256 bytes
.build()
.expect("Failed to build parser");
// Or configure directly on an existing parser (requires manual sync)
let mut parser = NetflowParser::default();
parser.max_error_sample_size = 512;
parser.v9_parser.max_error_sample_size = 512;
parser.ipfix_parser.max_error_sample_size = 512;
This setting helps prevent memory exhaustion when processing malformed or malicious packets while still providing enough context for debugging.
What changed: Two major improvements to error handling:
parse_bytes() now returns ParseResult to preserve partial results on errorsNetflowPacket::Error variant removed, errors now use ResultParseResult (prevents data loss):
// ❌ Old (0.7.x) - loses packets 1-4 if packet 5 errors
let packets = parser.parse_bytes(&data); // Returns Vec<NetflowPacket>
// Silent error: if parsing stopped at packet 5, you lost packets 1-4
// ✅ New (0.8.0) - keep packets 1-4 even if packet 5 errors
let result = parser.parse_bytes(&data); // Returns ParseResult
for packet in result.packets {
// Process successfully parsed packets 1-4
}
if let Some(e) = result.error {
eprintln!("Error at packet 5: {}", e); // But still got partial results!
}
Error Handling (use Result instead of Error variant):
// ❌ Old (0.7.x) - errors inline with packets
for packet in parser.parse_bytes(&data) {
match packet {
NetflowPacket::V5(v5) => { /* process */ }
NetflowPacket::Error(e) => { /* error */ }
_ => {}
}
}
// ✅ New (0.8.0) - use iter_packets() for Result-based errors
for result in parser.iter_packets(&data) {
match result {
Ok(NetflowPacket::V5(v5)) => { /* process */ }
Err(e) => { /* error */ }
_ => {}
}
}
IPFIX supports vendor-specific enterprise fields that extend the standard IANA field set. The library provides built-in support for several vendors (Cisco, VMWare, Netscaler, etc.), but you can also register your own custom enterprise fields:
use netflow_parser::NetflowParser;
use netflow_parser::variable_versions::data_number::FieldDataType;
use netflow_parser::variable_versions::enterprise_registry::EnterpriseFieldDef;
// Register custom enterprise fields for your vendor
let parser = NetflowParser::builder()
.register_enterprise_field(EnterpriseFieldDef::new(
12345, // Your enterprise number (assigned by IANA)
1, // Field number within your enterprise
"customMetric",
FieldDataType::UnsignedDataNumber,
))
.register_enterprise_field(EnterpriseFieldDef::new(
12345,
2,
"customApplicationName",
FieldDataType::String,
))
.build()
.expect("Failed to build parser");
// Parse IPFIX packets - custom fields are automatically decoded!
let packets = parser.parse_bytes(&buffer);
use netflow_parser::NetflowParser;
use netflow_parser::variable_versions::data_number::FieldDataType;
use netflow_parser::variable_versions::enterprise_registry::EnterpriseFieldDef;
let custom_fields = vec![
EnterpriseFieldDef::new(12345, 1, "field1", FieldDataType::UnsignedDataNumber),
EnterpriseFieldDef::new(12345, 2, "field2", FieldDataType::String),
EnterpriseFieldDef::new(12345, 3, "field3", FieldDataType::Ip4Addr),
EnterpriseFieldDef::new(12345, 4, "field4", FieldDataType::DurationMillis),
];
let parser = NetflowParser::builder()
.register_enterprise_fields(custom_fields)
.build()
.expect("Failed to build parser");
When registering enterprise fields, you can use any of these built-in data types:
FieldDataType::UnsignedDataNumber - Unsigned integers (variable length)FieldDataType::SignedDataNumber - Signed integers (variable length)FieldDataType::Float64 - 64-bit floating pointFieldDataType::String - UTF-8 stringsFieldDataType::Ip4Addr - IPv4 addressesFieldDataType::Ip6Addr - IPv6 addressesFieldDataType::MacAddr - MAC addressesFieldDataType::DurationSeconds - Durations in secondsFieldDataType::DurationMillis - Durations in millisecondsFieldDataType::DurationMicrosNTP - NTP microsecond timestampsFieldDataType::DurationNanosNTP - NTP nanosecond timestampsFieldDataType::ProtocolType - Protocol numbersFieldDataType::Vec - Raw byte arraysFieldDataType::ApplicationId - Application identifiersHow It Works:
FieldValue::Vec)name parameter is used for debugging and can help identify fields in logsSee examples/custom_enterprise_fields.rs for a complete working example.
use netflow_parser::NetflowParser;
use netflow_parser::variable_versions::ttl::TtlConfig;
use netflow_parser::variable_versions::data_number::FieldDataType;
use netflow_parser::variable_versions::enterprise_registry::EnterpriseFieldDef;
use std::time::Duration;
let parser = NetflowParser::builder()
// Cache configuration
.with_v9_cache_size(1000)
.with_ipfix_cache_size(2000)
// Security limits
.with_v9_max_field_count(5000)
.with_ipfix_max_field_count(10000)
.with_max_error_sample_size(512)
// Template TTL
.with_v9_ttl(TtlConfig::new(Duration::from_secs(3600)))
.with_ipfix_ttl(TtlConfig::new(Duration::from_secs(7200)))
// Version filtering
.with_allowed_versions([5, 9, 10].into())
// Enterprise fields
.register_enterprise_fields(vec![
EnterpriseFieldDef::new(12345, 1, "field1", FieldDataType::UnsignedDataNumber),
EnterpriseFieldDef::new(12345, 2, "field2", FieldDataType::String),
])
// Template lifecycle hooks
.on_template_event(|event| {
println!("Template event: {:?}", event);
})
.build()
.expect("Failed to build parser");
// For multi-source deployments, use AutoScopedParser instead:
// let scoped_parser = NetflowParser::builder()./* config */.multi_source();
We have included a NetflowCommon and NetflowCommonFlowSet structure.
This will allow you to use common fields without unpacking values from specific versions.
If the packet flow does not have the matching field it will simply be left as None.
use std::net::IpAddr;
use netflow_parser::protocol::ProtocolTypes;
#[derive(Debug, Default)]
pub struct NetflowCommon {
pub version: u16,
pub timestamp: u32,
pub flowsets: Vec<NetflowCommonFlowSet>,
}
#[derive(Debug, Default)]
struct NetflowCommonFlowSet {
src_addr: Option<IpAddr>,
dst_addr: Option<IpAddr>,
src_port: Option<u16>,
dst_port: Option<u16>,
protocol_number: Option<u8>,
protocol_type: Option<ProtocolTypes>,
first_seen: Option<u32>,
last_seen: Option<u32>,
src_mac: Option<String>,
dst_mac: Option<String>,
}
use netflow_parser::{NetflowParser, NetflowPacket};
// 0000 00 05 00 01 03 00 04 00 05 00 06 07 08 09 00 01 ................
// 0010 02 03 04 05 06 07 08 09 00 01 02 03 04 05 06 07 ................
// 0020 08 09 00 01 02 03 04 05 06 07 08 09 00 01 02 03 ................
// 0030 04 05 06 07 08 09 00 01 02 03 04 05 06 07 08 09 ................
// 0040 00 01 02 03 04 05 06 07 ........
let v5_packet = [0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3,
4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7];
let result = NetflowParser::default().parse_bytes(&v5_packet);
let netflow_common = result.packets
.first()
.unwrap()
.as_netflow_common()
.unwrap();
for common_flow in netflow_common.flowsets.iter() {
println!("Src Addr: {} Dst Addr: {}", common_flow.src_addr.unwrap(), common_flow.dst_addr.unwrap());
}
To gather all flowsets from all packets into a flattened vector:
use netflow_parser::NetflowParser;
let flowsets = NetflowParser::default().parse_bytes_as_netflow_common_flowsets(&v5_packet);
By default, NetflowCommon maps standard IANA fields to the common structure. However, you can customize which fields are used for V9 and IPFIX packets using configuration structs. This is useful when:
use netflow_parser::netflow_common::{NetflowCommon, V9FieldMappingConfig};
use netflow_parser::variable_versions::v9_lookup::V9Field;
// Create a custom configuration that prefers IPv6 addresses
let mut config = V9FieldMappingConfig::default();
config.src_addr.primary = V9Field::Ipv6SrcAddr;
config.src_addr.fallback = Some(V9Field::Ipv4SrcAddr);
config.dst_addr.primary = V9Field::Ipv6DstAddr;
config.dst_addr.fallback = Some(V9Field::Ipv4DstAddr);
// Use with a parsed V9 packet
// let common = NetflowCommon::from_v9_with_config(&v9_packet, &config);
use netflow_parser::netflow_common::{NetflowCommon, IPFixFieldMappingConfig};
use netflow_parser::variable_versions::ipfix_lookup::{IPFixField, IANAIPFixField};
// Create a custom configuration that prefers IPv6 addresses
let mut config = IPFixFieldMappingConfig::default();
config.src_addr.primary = IPFixField::IANA(IANAIPFixField::SourceIpv6address);
config.src_addr.fallback = Some(IPFixField::IANA(IANAIPFixField::SourceIpv4address));
config.dst_addr.primary = IPFixField::IANA(IANAIPFixField::DestinationIpv6address);
config.dst_addr.fallback = Some(IPFixField::IANA(IANAIPFixField::DestinationIpv4address));
// Use with a parsed IPFIX packet
// let common = NetflowCommon::from_ipfix_with_config(&ipfix_packet, &config);
Both V9FieldMappingConfig and IPFixFieldMappingConfig support configuring:
| Field | Description | Default V9 Field | Default IPFIX Field |
|---|---|---|---|
src_addr |
Source IP address | Ipv4SrcAddr (fallback: Ipv6SrcAddr) | SourceIpv4address (fallback: SourceIpv6address) |
dst_addr |
Destination IP address | Ipv4DstAddr (fallback: Ipv6DstAddr) | DestinationIpv4address (fallback: DestinationIpv6address) |
src_port |
Source port | L4SrcPort | SourceTransportPort |
dst_port |
Destination port | L4DstPort | DestinationTransportPort |
protocol |
Protocol number | Protocol | ProtocolIdentifier |
first_seen |
Flow start time | FirstSwitched | FlowStartSysUpTime |
last_seen |
Flow end time | LastSwitched | FlowEndSysUpTime |
src_mac |
Source MAC address | InSrcMac | SourceMacaddress |
dst_mac |
Destination MAC address | InDstMac | DestinationMacaddress |
Each field mapping has a primary field (always checked first) and an optional fallback field (used if primary is not present in the flow record).
Parsed V5, V7, V9, and IPFIX packets can be re-exported back into bytes.
V9/IPFIX Padding Behavior:
Creating Data Structs:
For convenience, use Data::new() and OptionsData::new() to create data structures without manually specifying padding:
use netflow_parser::variable_versions::ipfix::Data;
// Padding is automatically set to empty vec and calculated during export
let data = Data::new(vec![vec![
(field1, value1),
(field2, value2),
]]);
See examples/manual_ipfix_creation.rs for a complete example of creating IPFIX packets from scratch.
use netflow_parser::{NetflowParser, NetflowPacket};
// 0000 00 05 00 01 03 00 04 00 05 00 06 07 08 09 00 01 ................
// 0010 02 03 04 05 06 07 08 09 00 01 02 03 04 05 06 07 ................
// 0020 08 09 00 01 02 03 04 05 06 07 08 09 00 01 02 03 ................
// 0030 04 05 06 07 08 09 00 01 02 03 04 05 06 07 08 09 ................
// 0040 00 01 02 03 04 05 06 07 ........
let packet = [
0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3,
4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,
];
let result = NetflowParser::default().parse_bytes(&packet);
if let Some(NetflowPacket::V5(v5)) = result.packets.first() {
assert_eq!(v5.to_be_bytes(), packet);
}
Parse the data (&[u8]) like any other version. The parser (NetflowParser) caches parsed templates using LRU eviction, so you can send header/data flowset combos and it will use the cached templates. Templates are automatically cached and evicted when the cache limit is reached.
Template Management: For comprehensive information about template caching, introspection, multi-source deployments, and best practices, see the Template Management Guide section below.
IPFIX Note: We only parse sequence number and domain id, it is up to you if you wish to validate it.
FlowSet Access: To access templates flowset of a processed V9/IPFIX flowset you can find the flowsets attribute on the Parsed Record. In there you can find Templates, Option Templates, and Data Flowsets.
NetFlow V9 and IPFIX are template-based protocols where templates define the structure of flow records. This library provides comprehensive template management features to handle various deployment scenarios.
Track template cache performance to understand your parser's behavior:
use netflow_parser::NetflowParser;
let mut parser = NetflowParser::default();
// Parse some packets...
parser.parse_bytes(&data);
// Get cache statistics
let v9_stats = parser.v9_cache_stats();
println!("V9 Cache: {}/{} templates", v9_stats.current_size, v9_stats.max_size);
// Access performance metrics
let metrics = &v9_stats.metrics;
println!("Cache hits: {}", metrics.hits);
println!("Cache misses: {}", metrics.misses);
println!("Evictions: {}", metrics.evictions);
println!("Collisions: {}", metrics.collisions);
println!("Expired templates: {}", metrics.expired);
// Calculate hit rate
if let Some(hit_rate) = metrics.hit_rate() {
println!("Cache hit rate: {:.2}%", hit_rate * 100.0);
}
Metrics tracked:
⚠️ IMPORTANT: When parsing from multiple routers, template IDs collide. Different routers often use the same template ID (e.g., 256) with completely different schemas, causing cache thrashing and parsing failures.
The Problem:
// ❌ DON'T: Multiple sources sharing one parser
let mut parser = NetflowParser::default();
loop {
let (data, source_addr) = recv_from_network();
parser.parse_bytes(&data); // Router A's template 256 overwrites Router B's!
}
The Solution - Use AutoScopedParser:
// ✅ DO: Each source gets isolated template cache (RFC-compliant)
use netflow_parser::AutoScopedParser;
use std::net::SocketAddr;
let mut parser = AutoScopedParser::new();
// Parser automatically handles RFC-compliant scoping:
// - NetFlow v9: Uses (source_addr, source_id) per RFC 3954
// - IPFIX: Uses (source_addr, observation_domain_id) per RFC 7011
// - NetFlow v5/v7: Uses source_addr only
let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
let packets = parser.parse_from_source(source, &data);
// Monitor cache health
if parser.source_count() > 1 {
println!("Parsing from {} sources with isolated caches", parser.source_count());
}
Why AutoScopedParser?
For specialized requirements beyond automatic RFC-compliant scoping, use RouterScopedParser with custom key types:
use netflow_parser::RouterScopedParser;
use std::net::SocketAddr;
// Example: Custom scoping for named sources
let mut scoped = RouterScopedParser::<String>::new();
scoped.parse_from_source("router-nyc-01".to_string(), &data);
// Example: Manual composite key (not recommended - use AutoScopedParser instead)
#[derive(Hash, Eq, PartialEq, Clone)]
struct CustomKey {
router_name: String,
region: String,
}
let mut scoped = RouterScopedParser::<CustomKey>::new();
When to use RouterScopedParser instead of AutoScopedParser:
For standard NetFlow/IPFIX deployments, use AutoScopedParser instead.
Configure parsers with custom settings:
use netflow_parser::{AutoScopedParser, NetflowParser};
use netflow_parser::variable_versions::ttl::TtlConfig;
use std::time::Duration;
// Configure AutoScopedParser
let builder = NetflowParser::builder()
.with_cache_size(5000)
.with_ttl(TtlConfig::new(Duration::from_secs(3600)));
let mut parser = AutoScopedParser::with_builder(builder);
// Or configure RouterScopedParser for custom scoping
use netflow_parser::RouterScopedParser;
let mut scoped = RouterScopedParser::<String>::with_builder(builder);
Monitor when template IDs are reused with different definitions:
let v9_stats = parser.v9_cache_stats();
if v9_stats.metrics.collisions > 0 {
println!("Warning: {} template collisions detected", v9_stats.metrics.collisions);
println!("Use AutoScopedParser for RFC-compliant multi-source deployments");
}
What counts as a collision:
What does NOT count as a collision:
When a data flowset arrives before its template (IPFIX):
use netflow_parser::{NetflowParser, NetflowPacket};
use netflow_parser::variable_versions::ipfix::FlowSetBody;
let mut parser = NetflowParser::default();
let mut pending_data = Vec::new();
for packet in parser.iter_packets(&data) {
if let NetflowPacket::IPFix(ipfix) = packet {
for flowset in &ipfix.flowsets {
if let FlowSetBody::NoTemplate(info) = &flowset.body {
println!("Missing template ID: {}", info.template_id);
println!("Available templates: {:?}", info.available_templates);
// Save for retry after template arrives
pending_data.push(info.raw_data.clone());
}
}
}
}
// Retry pending data after templates arrive
for pending in &pending_data {
let _ = parser.parse_bytes(pending);
}
Inspect the template cache state at runtime without affecting LRU ordering:
use netflow_parser::NetflowParser;
let parser = NetflowParser::default();
// Get cache statistics
let v9_stats = parser.v9_cache_stats();
println!("V9 cache: {}/{} templates", v9_stats.current_size, v9_stats.max_size);
let ipfix_stats = parser.ipfix_cache_stats();
println!("IPFIX cache: {}/{} templates", ipfix_stats.current_size, ipfix_stats.max_size);
// List all cached template IDs
let v9_templates = parser.v9_template_ids();
println!("V9 template IDs: {:?}", v9_templates);
let ipfix_templates = parser.ipfix_template_ids();
println!("IPFIX template IDs: {:?}", ipfix_templates);
// Check if a specific template exists (doesn't affect LRU ordering)
if parser.has_v9_template(256) {
println!("Template 256 is cached");
}
// Clear all V9 templates
parser.clear_v9_templates();
// Clear all IPFIX templates
parser.clear_ipfix_templates();
// With RouterScopedParser - clear specific source
scoped_parser.clear_source_templates(&source_addr);
// Or clear all sources
scoped_parser.clear_all_templates();
Use AutoScopedParser for multi-source deployments ⭐
Monitor cache metrics
Configure appropriate cache size
current_size vs max_size to right-sizeUse TTL for long-running parsers
Handle missing templates gracefully
NoTemplateInfo to understand what's missingThread safety with scoped parsers
AutoScopedParser and RouterScopedParser are not thread-safeArc<Mutex<AutoScopedParser>> for multi-threaded applicationsParsers (NetflowParser, V9Parser, IPFixParser) are not thread-safe and should not be shared across threads without external synchronization. Each parser maintains internal state (template caches) that is modified during parsing.
Recommended pattern for multi-threaded applications:
examples/netflow_udp_listener_multi_threaded.rs for implementation exampleThis library includes several performance optimizations:
Best practices for optimal performance:
iter_packets() instead of parse_bytes() when you don't need all packets in a Vecparse_bytes_as_netflow_common_flowsets() when you only need flow dataparse_unknown_fields - When enabled fields not listed in this library will attempt to be parsed as a Vec of bytes and the field_number listed. When disabled an error is thrown when attempting to parse those fields. Enabled by default.netflow_common - When enabled provides NetflowCommon and NetflowCommonFlowSet structures for working with common fields across different Netflow versions. Disabled by default.Examples have been included mainly for those who want to use this parser to read from a Socket and parse netflow. In those cases with V9/IPFix it is best to create a new parser for each router. There are both single threaded and multi-threaded examples in the examples directory.
Examples that listen on a specific port use 9995 by default, however netflow can be configurated to use a variety of URP ports:
To run:
cargo run --example netflow_udp_listener_multi_threaded
cargo run --example netflow_udp_listener_single_threaded
cargo run --example netflow_udp_listener_tokio
cargo run --example netflow_pcap
cargo run --example manual_ipfix_creation
cargo run --example custom_enterprise_fields
The pcap example also shows how to cache flows that have not yet discovered a template. The custom_enterprise_fields example demonstrates how to register vendor-specific IPFIX fields.
If you find my work helpful, consider supporting me!