2024-03-01 07:43:15 +00:00
|
|
|
#!/usr/bin/python
|
|
|
|
|
2024-04-05 04:00:50 +00:00
|
|
|
import os
|
|
|
|
import pynetbox
|
|
|
|
|
2024-03-01 07:43:15 +00:00
|
|
|
class FilterModule(object):
|
|
|
|
'''Various utilities for manipulating NetBox data'''
|
2024-04-05 04:00:50 +00:00
|
|
|
|
|
|
|
def __init__(self):
|
2024-06-19 11:33:32 +00:00
|
|
|
if 'NETBOX_API' in os.environ and 'NETBOX_TOKEN' in os.environ:
|
|
|
|
self.nb = pynetbox.api(os.getenv('NETBOX_API'), os.getenv('NETBOX_TOKEN'))
|
2024-04-05 04:00:50 +00:00
|
|
|
|
2024-03-01 07:43:15 +00:00
|
|
|
def filters(self):
|
|
|
|
return {
|
2024-04-05 04:00:50 +00:00
|
|
|
'device_address': self.device_address,
|
|
|
|
'compact_numlist': self.compact_numlist,
|
|
|
|
'allowed_prefixes': self.allowed_prefixes
|
2024-03-01 07:43:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
def device_address(self, device):
|
|
|
|
'''Return loopback IP addresses for an L3 attached device'''
|
|
|
|
for iface in device['interfaces']:
|
|
|
|
for addr in iface['ip_addresses']:
|
|
|
|
if addr.get('role') and addr['role'].get('value') == 'loopback':
|
|
|
|
yield addr
|
2024-04-05 04:00:50 +00:00
|
|
|
|
|
|
|
def compact_numlist(self, nums, delimiter=',', range_delimiter='-'):
|
|
|
|
'''Transform [1,2,3,5,7,8,9] into "1-3,5,7-9"'''
|
|
|
|
i = 0
|
|
|
|
spans = []
|
|
|
|
while i < len(nums):
|
|
|
|
j = i + 1
|
|
|
|
while j < len(nums) and nums[j]-nums[i] == j-i:
|
|
|
|
j += 1
|
|
|
|
spans += [f'{nums[i]}{range_delimiter}{nums[j-1]}' if j > i+1 else f'{nums[i]}']
|
|
|
|
i = j
|
|
|
|
return delimiter.join(spans)
|
|
|
|
|
|
|
|
def allowed_prefixes(self, service):
|
|
|
|
'''Return a list of allowed IP prefixes for the given service'''
|
2024-06-19 11:33:32 +00:00
|
|
|
if 'custom_fields' in service:
|
|
|
|
service = service['custom_fields']
|
|
|
|
if prefixes := service.get('allowed_prefixes'):
|
|
|
|
yield from self.nb.ipam.prefixes.filter(id=[prefix['id'] for prefix in prefixes])
|
|
|
|
if vlans := service.get('allowed_vlans'):
|
|
|
|
yield from self.nb.ipam.prefixes.filter(vlan_id=[vlan['id'] for vlan in vlans])
|
|
|
|
if clusters := service.get('allowed_clusters'):
|
|
|
|
for device in self.nb.dcim.devices.filter(cluster_id=[cluster['id'] for cluster in clusters]):
|
2024-04-05 04:00:50 +00:00
|
|
|
yield from self.nb.ipam.ip_addresses.filter(role='loopback', device_id=device.id)
|