Timotej Lazar
fd9a46377b
For IPv6 addresses we cannot just compare string data, since we register a whole subnet for each WG key. Also drop the active tunnel check from list_custom endpoint.
146 lines
5.6 KiB
Python
146 lines
5.6 KiB
Python
import datetime
|
|
import ipaddress
|
|
import json
|
|
import re
|
|
import subprocess
|
|
|
|
import flask
|
|
import flask_login
|
|
|
|
from . import db
|
|
from . import ipsets
|
|
from . import system
|
|
|
|
blueprint = flask.Blueprint('vpn', __name__)
|
|
wgkey_regex = re.compile(r'^[A-Za-z0-9/+=]{44}$')
|
|
|
|
@blueprint.route('/')
|
|
@flask_login.login_required
|
|
def index():
|
|
return flask.render_template('vpn/index.html')
|
|
|
|
@blueprint.route('/custom')
|
|
@flask_login.login_required
|
|
def custom():
|
|
if not flask_login.current_user.is_admin:
|
|
return flask.Response('forbidden', status=403, mimetype='text/plain')
|
|
with db.locked():
|
|
keys = {ip: data for ip, data in db.read('wireguard').items() if data.get('networks') and not data.get('user')}
|
|
return flask.render_template('vpn/custom.html', keys=keys, ipsets=ipsets.read().keys())
|
|
|
|
@blueprint.route('/list')
|
|
@flask_login.login_required
|
|
def list():
|
|
# Return logged-in user’s keys, marking the key used for current connection (if any).
|
|
user = flask_login.current_user.get_id()
|
|
remote_addr = ipaddress.ip_address(flask.request.remote_addr)
|
|
return flask.jsonify([
|
|
data | {'ip': ip, 'active': any(remote_addr in ipaddress.ip_network(addr) for addr in (ip, data.get('ip6')))}
|
|
for ip, data in db.load('wireguard').items() if data.get('user') == user
|
|
])
|
|
|
|
@blueprint.route('/list-custom')
|
|
@flask_login.login_required
|
|
def list_custom():
|
|
# Return all custom keys.
|
|
if not flask_login.current_user.is_admin:
|
|
return flask.Response('forbidden', status=403, mimetype='text/plain')
|
|
return flask.jsonify([
|
|
data | {'ip': ip}
|
|
for ip, data in db.load('wireguard').items() if data.get('networks') and not data.get('user')
|
|
])
|
|
|
|
@blueprint.route('/new', methods=('POST',))
|
|
@flask_login.login_required
|
|
def new():
|
|
# Each key is assigned a new IPv4 address from the pool settings['wg_net'].
|
|
# Each key is then assigned a corresponding IPv6 subnet, depending on the amount of surplus addresses available.
|
|
# For wg_net 10.10.0.0/18 and wg_net6 1234:5678:90ab:cdef::/64,
|
|
# the key for 10.10.0.10/32 would get 1234:5678:90ab:cdef:a::/80.
|
|
def ipv4to6(net4, ip4, net6):
|
|
# Calculate the address and prefix length for the IPv6 network that can be assigned to this key.
|
|
len4 = (net4.max_prefixlen - net4.prefixlen)
|
|
len6 = (net6.max_prefixlen - net6.prefixlen)
|
|
# Make sure the network address ends at a colon. Wastes some addresses but IPv6.
|
|
assigned = (len6 - len4) - (len6 - len4) % 16
|
|
ip6 = (net6.network_address + (index<<assigned)).compressed
|
|
return ip6 + '/' + str(net6.max_prefixlen - assigned)
|
|
|
|
pubkey = flask.request.json.get('pubkey', '')
|
|
options = flask.request.json.get('options', {})
|
|
if not re.match(wgkey_regex, pubkey):
|
|
return flask.Response('invalid key', status=400, mimetype='text/plain')
|
|
|
|
# Read server’s private key and find the corresponding public key.
|
|
settings = db.load('settings')
|
|
server_pubkey = subprocess.run([f'wg pubkey'], input=settings.get('wg_key'),
|
|
text=True, capture_output=True, shell=True).stdout.strip()
|
|
|
|
now = datetime.datetime.utcnow()
|
|
key = {
|
|
'key': pubkey,
|
|
'time': now.timestamp(),
|
|
'name': re.sub('[^-._A-Za-z0-9]', '', options.get('name', '')),
|
|
}
|
|
|
|
# Determine IPv4 and IPv6 addresses for the new key.
|
|
host = ipaddress.ip_interface(settings.get('wg_net') or '10.0.0.1/24')
|
|
key['ip6'] = None
|
|
with db.locked():
|
|
# Find a free address for the new key.
|
|
keys = db.read('wireguard')
|
|
for index, ip in enumerate(host.network.hosts(), start=1):
|
|
if ip != host.ip and str(ip) not in keys:
|
|
if wg_net6 := settings.get('wg_net6'):
|
|
key['ip6'] = ipv4to6(host.network, ip, ipaddress.ip_interface(wg_net6).network)
|
|
break
|
|
else:
|
|
return flask.Response('no more available IP addresses', status=500, mimetype='text/plain')
|
|
|
|
# Add remaining attributes to new key and update key database.
|
|
if flask_login.current_user.is_admin and flask.request.json.get('networks'):
|
|
key['networks'] = flask.request.json.get('networks')
|
|
else:
|
|
key['user'] = flask_login.current_user.get_id()
|
|
keys[str(ip)] = key
|
|
db.write('wireguard', keys)
|
|
|
|
# Generate a new config archive for firewall nodes.
|
|
system.run(system.save_config)
|
|
|
|
# Template arguments.
|
|
args = {
|
|
'server': settings.get('wg_endpoint'),
|
|
'port': settings.get('wg_port') or '51820',
|
|
'server_key': server_pubkey,
|
|
'pubkey': pubkey,
|
|
'timestamp': now,
|
|
'ip': str(ip),
|
|
'ip6': key['ip6'],
|
|
'name': key['name'],
|
|
'user': key.get('user', 'custom'),
|
|
'dns': settings.get('wg_dns', '') if options.get('use-dns', True) else False,
|
|
'allowed_nets': settings.get('wg_allowed_nets', ''),
|
|
'add_default': options.get('add-default', False),
|
|
}
|
|
return flask.render_template('vpn/wg-fri.conf', **args)
|
|
|
|
@blueprint.route('/del', methods=('POST',))
|
|
@flask_login.login_required
|
|
def delete():
|
|
pubkey = flask.request.json.get('pubkey', '')
|
|
if not wgkey_regex.match(pubkey):
|
|
return flask.Response('invalid key', status=400, mimetype='text/plain')
|
|
|
|
with db.locked():
|
|
user = flask_login.current_user.get_id()
|
|
keys = {
|
|
ip: data for ip, data in db.read('wireguard').items()
|
|
if not (data.get('key') == pubkey and (data.get('user') == user or flask_login.current_user.is_admin))
|
|
}
|
|
db.write('wireguard', keys)
|
|
|
|
system.run(system.save_config)
|
|
|
|
return flask.Response(f'deleted key {pubkey}', status=200, mimetype='text/plain')
|