Like allowed_prefixes, but for single IP addresses. Currently used just for DHCP server to allow (only) packets from relays.
		
			
				
	
	
		
			59 lines
		
	
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			59 lines
		
	
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/python
 | |
| 
 | |
| import ipaddress
 | |
| import os
 | |
| import pynetbox
 | |
| 
 | |
| class FilterModule(object):
 | |
|     '''Various utilities for manipulating NetBox data'''
 | |
| 
 | |
|     def __init__(self):
 | |
|         if 'NETBOX_API' in os.environ and 'NETBOX_TOKEN' in os.environ:
 | |
|             self.nb = pynetbox.api(os.getenv('NETBOX_API'), os.getenv('NETBOX_TOKEN'))
 | |
| 
 | |
|     def filters(self):
 | |
|         return {
 | |
|             'device_address': self.device_address,
 | |
|             'compact_numlist': self.compact_numlist,
 | |
|             'allowed_prefixes': self.allowed_prefixes
 | |
|         }
 | |
| 
 | |
|     def device_address(self, device):
 | |
|         '''Return loopback IP addresses for an L3 attached device'''
 | |
|         for iface in device['interfaces']:
 | |
|             for addr in iface['ip_addresses']:
 | |
|                 if addr.get('role') and addr['role'].get('value') == 'loopback':
 | |
|                     yield addr
 | |
| 
 | |
|     def compact_numlist(self, nums, delimiter=',', range_delimiter='-'):
 | |
|         '''Transform [1,2,3,5,7,8,9] into "1-3,5,7-9"'''
 | |
|         i = 0
 | |
|         spans = []
 | |
|         while i < len(nums):
 | |
|             j = i + 1
 | |
|             while j < len(nums) and nums[j]-nums[i] == j-i:
 | |
|                 j += 1
 | |
|             spans += [f'{nums[i]}{range_delimiter}{nums[j-1]}' if j > i+1 else f'{nums[i]}']
 | |
|             i = j
 | |
|         return delimiter.join(spans)
 | |
| 
 | |
|     def allowed_prefixes(self, service):
 | |
|         '''Return a list of allowed IP prefixes for the given service'''
 | |
|         # Depending on how service was obtained, actual data might be nested under custom_fields.
 | |
|         if 'custom_fields' in service:
 | |
|             service = service['custom_fields']
 | |
| 
 | |
|         def ip2str(address, single=False):
 | |
|             if single: # 1.2.3.4/24 → 1.2.3.4
 | |
|                 address = ipaddress.ip_interface(address).ip
 | |
|             return str(ipaddress.ip_interface(address))
 | |
| 
 | |
|         if ips := service.get('allowed_ips'):
 | |
|             yield from (ip2str(ip['address'], single=True) for ip in ips)
 | |
|         if prefixes := service.get('allowed_prefixes'):
 | |
|             yield from (ip2str(e.prefix) for e in self.nb.ipam.prefixes.filter(id=[prefix['id'] for prefix in prefixes]))
 | |
|         if vlans := service.get('allowed_vlans'):
 | |
|             yield from (ip2str(e.prefix) for e in self.nb.ipam.prefixes.filter(vlan_id=[vlan['id'] for vlan in vlans]))
 | |
|         if clusters := service.get('allowed_clusters'):
 | |
|             for device in self.nb.dcim.devices.filter(cluster_id=[cluster['id'] for cluster in clusters]):
 | |
|                 yield from (ip2str(e.address) for e in self.nb.ipam.ip_addresses.filter(role='loopback', device_id=device.id))
 |