From c7c71faa112d6b0ee5cd3b649298c00b275de4f0 Mon Sep 17 00:00:00 2001 From: Hitoshi Irino Date: Wed, 21 Feb 2024 23:06:52 +0900 Subject: [PATCH] Fixed analyzer.py to display collected flows whose version is not 9 --- netflow/analyzer.py | 68 ++++++++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/netflow/analyzer.py b/netflow/analyzer.py index 7448b84..0098ae2 100644 --- a/netflow/analyzer.py +++ b/netflow/analyzer.py @@ -96,8 +96,8 @@ def __init__(self, flow1, flow2): # Assume the size that sent the most data is the source # TODO: this might not always be right, maybe use earlier timestamp? - size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS']) - size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS']) + size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS', "octetDeltaCount"]) + size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS', "octetDeltaCount"]) if size1 >= size2: src = flow1 dest = flow2 @@ -120,12 +120,24 @@ def __init__(self, flow1, flow2): ips = self.get_ips(src) self.src = ips.src self.dest = ips.dest - self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT']) - self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT']) - self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS']) + self.proto = fallback(src, ['PROTOCOL', 'PROTO', 'protocolIdentifier']) + # ICMP and ICMPv6 does not include port fields + if self.proto == 1 or self.proto == 58: + self.src_port = 0 + # ICMP field is treated as destination port + try: + self.dest_port = fallback(dest, ['ICMP_TYPE', 'icmpTypeCodeIPv4', 'icmpTypeCodeIPv6']) + except: + self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT', 'destinationTransportPort']) + else: + self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT', 'sourceTransportPort']) + self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT', 'destinationTransportPort']) + self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS', 'octetDeltaCount']) # Duration is given in milliseconds - self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED'] + lastSwitched = fallback(src, ['LAST_SWITCHED', 'flowEndSysUpTime']) + firstSwitched = fallback(src, ['FIRST_SWITCHED', 'flowStartSysUpTime']) + self.duration = lastSwitched - firstSwitched if self.duration < 0: # 32 bit int has its limits. Handling overflow here # TODO: Should be handled in the collection phase @@ -139,16 +151,17 @@ def __repr__(self): def get_ips(flow): # IPv4 if flow.get('IP_PROTOCOL_VERSION') == 4 or \ + 'sourceIPv4Address' in flow or 'destinationIPv4Address' in flow or\ 'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow: return Pair( - ipaddress.ip_address(flow['IPV4_SRC_ADDR']), - ipaddress.ip_address(flow['IPV4_DST_ADDR']) + ipaddress.ip_address(fallback(flow, ['IPV4_SRC_ADDR', 'sourceIPv4Address'])), + ipaddress.ip_address(fallback(flow, ['IPV4_DST_ADDR', 'destinationIPv4Address'])) ) # IPv6 return Pair( - ipaddress.ip_address(flow['IPV6_SRC_ADDR']), - ipaddress.ip_address(flow['IPV6_DST_ADDR']) + ipaddress.ip_address(fallback(flow, ['IPV6_SRC_ADDR', 'sourceIPv6Address'])), + ipaddress.ip_address(fallback(flow, ['IPV6_DST_ADDR', 'destinationIPv6Address'])) ) @property @@ -179,7 +192,13 @@ def service(self): @property def total_packets(self): - return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"] + src_flow_packets = fallback( + self.src_flow, ["IN_PKTS", "IN_PACKETS", "packetDeltaCount"] + ) + dest_flow_packets = fallback( + self.dest_flow, ["IN_PKTS", "IN_PACKETS", "packetDeltaCount"] + ) + return src_flow_packets + dest_flow_packets if __name__ == "netflow.analyzer": @@ -236,10 +255,6 @@ def total_packets(self): logger.error("No header dict in entry {}".format(ts)) raise ValueError - if entry[ts]["header"]["version"] == 10: - logger.warning("Skipped IPFIX entry, because analysis of IPFIX is not yet implemented") - continue - data[ts] = entry[ts] # Go through data and dissect every flow saved inside the dump @@ -258,20 +273,29 @@ def total_packets(self): client = data[key]["client"] flows = data[key]["flows"] - for flow in sorted(flows, key=lambda x: x["FIRST_SWITCHED"]): - first_switched = flow["FIRST_SWITCHED"] + for flow in sorted(flows, + key=lambda x:fallback(x, + ["FIRST_SWITCHED", "flowStartSysUpTime", "systemInitTimeMilliseconds"], + ), + ): + if "systemInitTimeMilliseconds" in flow: + # systemInitTimeMilliseconds exists in only option data record + continue + first_switched = fallback(flow, ["FIRST_SWITCHED", "flowStartSysUpTime"]) if first_switched - 1 in pending: # TODO: handle fitting, yet mismatching (here: 1 second) pairs pass # Find the peer for this connection - if "IPV4_SRC_ADDR" in flow or flow.get("IP_PROTOCOL_VERSION") == 4: - local_peer = flow["IPV4_SRC_ADDR"] - remote_peer = flow["IPV4_DST_ADDR"] + if ("IPV4_SRC_ADDR" in flow or "sourceIPv4Address" in flow + or fallback(flow, ["IP_PROTOCOL_VERSION", "ipVersion"]) == 4 + ): + local_peer = fallback(flow, ["IPV4_SRC_ADDR", "sourceIPv4Address"]) + remote_peer = fallback(flow, ["IPV4_DST_ADDR", "destinationIPv4Address"]) else: - local_peer = flow["IPV6_SRC_ADDR"] - remote_peer = flow["IPV6_DST_ADDR"] + local_peer = fallback(flow, ["IPV6_SRC_ADDR", "sourceIPv6Address"]) + remote_peer = fallback(flow, ["IPV6_DST_ADDR", "destinationIPv6Address"]) # Match on host filter passed in as argument if args.match_host and not any([local_peer == args.match_host, remote_peer == args.match_host]):