Timotej Lazar
3c25cbe88a
Custom keys are created by admin and specify networks directly, bypassing AD permissions. They are intended to join managed devices into networks where users are not allowed to create keys themselves. Also comprehend a set directly.
145 lines
5.6 KiB
Python
145 lines
5.6 KiB
Python
import datetime
|
|
import ipaddress
|
|
import json
|
|
import re
|
|
import subprocess
|
|
|
|
import flask
|
|
import flask_login
|
|
|
|
from . import db
|
|
from . import system
|
|
|
|
blueprint = flask.Blueprint('vpn', __name__)
|
|
wgkey_regex = re.compile(r'^[A-Za-z0-9/+=]{44}$')
|
|
|
|
@blueprint.route('/')
|
|
@flask_login.login_required
|
|
def index():
|
|
return flask.render_template('vpn/index.html')
|
|
|
|
@blueprint.route('/custom')
|
|
@flask_login.login_required
|
|
def custom():
|
|
if not flask_login.current_user.is_admin:
|
|
return flask.Response('forbidden', status=403, mimetype='text/plain')
|
|
with db.locked():
|
|
keys = {ip: data for ip, data in db.read('wireguard').items() if data.get('networks') and not data.get('user')}
|
|
ipsets = db.read('networks') | db.read('ipsets')
|
|
return flask.render_template('vpn/custom.html', keys=keys, ipsets=ipsets.keys())
|
|
|
|
@blueprint.route('/list')
|
|
@flask_login.login_required
|
|
def list():
|
|
# Return logged-in user’s keys, marking the key used for current connection (if any).
|
|
user = flask_login.current_user.get_id()
|
|
return flask.jsonify([
|
|
data | {'ip': ip, 'active': flask.request.remote_addr in (ip, data.get('ip6'))}
|
|
for ip, data in db.load('wireguard').items() if data.get('user') == user
|
|
])
|
|
|
|
@blueprint.route('/list-custom')
|
|
@flask_login.login_required
|
|
def list_custom():
|
|
# Return all custom keys.
|
|
if not flask_login.current_user.is_admin:
|
|
return flask.Response('forbidden', status=403, mimetype='text/plain')
|
|
return flask.jsonify([
|
|
data | {'ip': ip, 'active': flask.request.remote_addr in (ip, data.get('ip6'))}
|
|
for ip, data in db.load('wireguard').items() if data.get('networks') and not data.get('user')
|
|
])
|
|
|
|
@blueprint.route('/new', methods=('POST',))
|
|
@flask_login.login_required
|
|
def new():
|
|
# Each key is assigned a new IPv4 address from the pool settings['wg_net'].
|
|
# Each key is then assigned a corresponding IPv6 subnet, depending on the amount of surplus addresses available.
|
|
# For wg_net 10.10.0.0/18 and wg_net6 1234:5678:90ab:cdef::/64,
|
|
# the key for 10.10.0.10/32 would get 1234:5678:90ab:cdef:a::/80.
|
|
def ipv4to6(net4, ip4, net6):
|
|
# Calculate the address and prefix length for the IPv6 network that can be assigned to this key.
|
|
len4 = (net4.max_prefixlen - net4.prefixlen)
|
|
len6 = (net6.max_prefixlen - net6.prefixlen)
|
|
# Make sure the network address ends at a colon. Wastes some addresses but IPv6.
|
|
assigned = (len6 - len4) - (len6 - len4) % 16
|
|
ip6 = (net6.network_address + (index<<assigned)).compressed
|
|
return ip6 + '/' + str(net6.max_prefixlen - assigned)
|
|
|
|
pubkey = flask.request.json.get('pubkey', '')
|
|
options = flask.request.json.get('options', {})
|
|
if not re.match(wgkey_regex, pubkey):
|
|
return flask.Response('invalid key', status=400, mimetype='text/plain')
|
|
|
|
# Read server’s private key and find the corresponding public key.
|
|
settings = db.load('settings')
|
|
server_pubkey = subprocess.run([f'wg pubkey'], input=settings.get('wg_key'),
|
|
text=True, capture_output=True, shell=True).stdout.strip()
|
|
|
|
now = datetime.datetime.utcnow()
|
|
key = {
|
|
'key': pubkey,
|
|
'time': now.timestamp(),
|
|
'name': re.sub('[^-._A-Za-z0-9]', '', options.get('name', '')),
|
|
}
|
|
|
|
# Determine IPv4 and IPv6 addresses for the new key.
|
|
host = ipaddress.ip_interface(settings.get('wg_net') or '10.0.0.1/24')
|
|
key['ip6'] = None
|
|
with db.locked():
|
|
# Find a free address for the new key.
|
|
keys = db.read('wireguard')
|
|
for index, ip in enumerate(host.network.hosts(), start=1):
|
|
if ip != host.ip and str(ip) not in keys:
|
|
if wg_net6 := settings.get('wg_net6'):
|
|
key['ip6'] = ipv4to6(host.network, ip, ipaddress.ip_interface(wg_net6).network)
|
|
break
|
|
else:
|
|
return flask.Response('no more available IP addresses', status=500, mimetype='text/plain')
|
|
|
|
# Add remaining attributes to new key and update key database.
|
|
if flask_login.current_user.is_admin and flask.request.json.get('networks'):
|
|
key['networks'] = flask.request.json.get('networks')
|
|
else:
|
|
key['user'] = flask_login.current_user.get_id()
|
|
keys[str(ip)] = key
|
|
db.write('wireguard', keys)
|
|
|
|
# Generate a new config archive for firewall nodes.
|
|
system.run(system.save_config)
|
|
|
|
# Template arguments.
|
|
args = {
|
|
'server': settings.get('wg_endpoint'),
|
|
'port': settings.get('wg_port') or '51820',
|
|
'server_key': server_pubkey,
|
|
'pubkey': pubkey,
|
|
'timestamp': now,
|
|
'ip': str(ip),
|
|
'ip6': key['ip6'],
|
|
'name': key['name'],
|
|
'user': key.get('user', 'custom'),
|
|
'dns': settings.get('wg_dns', '') if options.get('use-dns', True) else False,
|
|
'allowed_nets': settings.get('wg_allowed_nets', ''),
|
|
'add_default': options.get('add-default', False),
|
|
}
|
|
return flask.render_template('vpn/wg-fri.conf', **args)
|
|
|
|
@blueprint.route('/del', methods=('POST',))
|
|
@flask_login.login_required
|
|
def delete():
|
|
pubkey = flask.request.json.get('pubkey', '')
|
|
if not wgkey_regex.match(pubkey):
|
|
return flask.Response('invalid key', status=400, mimetype='text/plain')
|
|
|
|
with db.locked():
|
|
user = flask_login.current_user.get_id()
|
|
keys = {
|
|
ip: data for ip, data in db.read('wireguard').items()
|
|
if not (data.get('key') == pubkey and (data.get('user') == user or flask_login.current_user.is_admin))
|
|
}
|
|
db.write('wireguard', keys)
|
|
|
|
system.run(system.save_config)
|
|
|
|
return flask.Response(f'deleted key {pubkey}', status=200, mimetype='text/plain')
|