servers/filter_plugins/netbox.py

48 lines
1.9 KiB
Python
Raw Normal View History

#!/usr/bin/python
import os
import pynetbox
class FilterModule(object):
'''Various utilities for manipulating NetBox data'''
def __init__(self):
self.nb = pynetbox.api(os.getenv('NETBOX_API'), os.getenv('NETBOX_TOKEN'))
def filters(self):
return {
'device_address': self.device_address,
'compact_numlist': self.compact_numlist,
'allowed_prefixes': self.allowed_prefixes
}
def device_address(self, device):
'''Return loopback IP addresses for an L3 attached device'''
for iface in device['interfaces']:
for addr in iface['ip_addresses']:
if addr.get('role') and addr['role'].get('value') == 'loopback':
yield addr
def compact_numlist(self, nums, delimiter=',', range_delimiter='-'):
'''Transform [1,2,3,5,7,8,9] into "1-3,5,7-9"'''
i = 0
spans = []
while i < len(nums):
j = i + 1
while j < len(nums) and nums[j]-nums[i] == j-i:
j += 1
spans += [f'{nums[i]}{range_delimiter}{nums[j-1]}' if j > i+1 else f'{nums[i]}']
i = j
return delimiter.join(spans)
def allowed_prefixes(self, service):
'''Return a list of allowed IP prefixes for the given service'''
service_data = self.nb.ipam.services.get(service['id']).custom_fields
if service_data['allowed_prefixes']:
yield from self.nb.ipam.prefixes.filter(id=[prefix['id'] for prefix in service_data['allowed_prefixes']])
if service_data['allowed_vlans']:
yield from self.nb.ipam.prefixes.filter(vlan_id=[vlan['id'] for vlan in service_data['allowed_vlans']])
if service_data['allowed_clusters']:
for device in self.nb.dcim.devices.filter(cluster_id=[cluster['id'] for cluster in service_data['allowed_clusters']]):
yield from self.nb.ipam.ip_addresses.filter(role='loopback', device_id=device.id)