servers/filter_plugins/netbox.py
Timotej Lazar e7f9132571 proxmox: set up firewall
Firewall policy is set in NetBox as cluster services¹. For Proxmox we
have to manually allow communication between nodes when using L3,
since the default management ipset does not get populated correctly.
We also need to open VTEP communication between nodes, which the
default rules don’t. We allow all inter-node traffic, as SSH without
passwords must be permitted anyway.

This also adds some helper filters that are spectacularly annoying to
implement purely in templates.

¹ There is actually no such thing as as a cluster service (yet?), so
instead we create a fake VM for the cluster, define services for it,
and then add the same services to a custom field on the cluster.
Alternative would be to tie services to a specific node, but that
could be problematic if that node is replaced.
2024-04-05 06:00:50 +02:00

48 lines
1.9 KiB
Python

#!/usr/bin/python
import os
import pynetbox
class FilterModule(object):
'''Various utilities for manipulating NetBox data'''
def __init__(self):
self.nb = pynetbox.api(os.getenv('NETBOX_API'), os.getenv('NETBOX_TOKEN'))
def filters(self):
return {
'device_address': self.device_address,
'compact_numlist': self.compact_numlist,
'allowed_prefixes': self.allowed_prefixes
}
def device_address(self, device):
'''Return loopback IP addresses for an L3 attached device'''
for iface in device['interfaces']:
for addr in iface['ip_addresses']:
if addr.get('role') and addr['role'].get('value') == 'loopback':
yield addr
def compact_numlist(self, nums, delimiter=',', range_delimiter='-'):
'''Transform [1,2,3,5,7,8,9] into "1-3,5,7-9"'''
i = 0
spans = []
while i < len(nums):
j = i + 1
while j < len(nums) and nums[j]-nums[i] == j-i:
j += 1
spans += [f'{nums[i]}{range_delimiter}{nums[j-1]}' if j > i+1 else f'{nums[i]}']
i = j
return delimiter.join(spans)
def allowed_prefixes(self, service):
'''Return a list of allowed IP prefixes for the given service'''
service_data = self.nb.ipam.services.get(service['id']).custom_fields
if service_data['allowed_prefixes']:
yield from self.nb.ipam.prefixes.filter(id=[prefix['id'] for prefix in service_data['allowed_prefixes']])
if service_data['allowed_vlans']:
yield from self.nb.ipam.prefixes.filter(vlan_id=[vlan['id'] for vlan in service_data['allowed_vlans']])
if service_data['allowed_clusters']:
for device in self.nb.dcim.devices.filter(cluster_id=[cluster['id'] for cluster in service_data['allowed_clusters']]):
yield from self.nb.ipam.ip_addresses.filter(role='loopback', device_id=device.id)