#!/usr/bin/python import ipaddress import os import pynetbox class FilterModule(object): '''Various utilities for manipulating NetBox data''' def __init__(self): if 'NETBOX_API' in os.environ and 'NETBOX_TOKEN' in os.environ: self.nb = pynetbox.api(os.getenv('NETBOX_API'), os.getenv('NETBOX_TOKEN')) def filters(self): return { 'device_address': self.device_address, 'compact_numlist': self.compact_numlist, 'allowed_prefixes': self.allowed_prefixes } def device_address(self, device): '''Return loopback IP addresses for an L3 attached device''' for iface in device['interfaces']: for addr in iface['ip_addresses']: if addr.get('role') and addr['role'].get('value') == 'loopback': yield addr def compact_numlist(self, nums, delimiter=',', range_delimiter='-'): '''Transform [1,2,3,5,7,8,9] into "1-3,5,7-9"''' i = 0 spans = [] while i < len(nums): j = i + 1 while j < len(nums) and nums[j]-nums[i] == j-i: j += 1 spans += [f'{nums[i]}{range_delimiter}{nums[j-1]}' if j > i+1 else f'{nums[i]}'] i = j return delimiter.join(spans) def allowed_prefixes(self, service): '''Return a list of allowed IP prefixes for the given service''' # Depending on how service was obtained, actual data might be nested under custom_fields. if 'custom_fields' in service: service = service['custom_fields'] def ip2str(address, single=False): if single: # 1.2.3.4/24 → 1.2.3.4 address = ipaddress.ip_interface(address).ip return str(ipaddress.ip_interface(address)) if ips := service.get('allowed_ips'): yield from (ip2str(ip['address'], single=True) for ip in ips) if prefixes := service.get('allowed_prefixes'): yield from (ip2str(e.prefix) for e in self.nb.ipam.prefixes.filter(id=[prefix['id'] for prefix in prefixes])) if vlans := service.get('allowed_vlans'): yield from (ip2str(e.prefix) for e in self.nb.ipam.prefixes.filter(vlan_id=[vlan['id'] for vlan in vlans])) if clusters := service.get('allowed_clusters'): for device in self.nb.dcim.devices.filter(cluster_id=[cluster['id'] for cluster in clusters]): yield from (ip2str(e.address) for e in self.nb.ipam.ip_addresses.filter(role='loopback', device_id=device.id))