230 lines
7.3 KiB
Python
Executable File
230 lines
7.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
DEFAULT_REMOTE_HOST = '4.2.2.2'
|
|
TAG_IN_PATTERN = re.compile(r"^<(.*)>$")
|
|
TAG_OUT_PATTERN = re.compile(r"^</(.*)>$")
|
|
|
|
|
|
def check_connection(remote_host=DEFAULT_REMOTE_HOST) -> bool:
|
|
""" Check internet connection by pinging remote_host """
|
|
ping_proc = subprocess.run(['ping', '-c', '5', remote_host], capture_output=True)
|
|
if ping_proc.stdout:
|
|
logging.info("ping stdout:\n%s", ping_proc.stdout.decode().strip())
|
|
if ping_proc.stderr:
|
|
logging.info("ping stderr:\n%s", ping_proc.stderr.decode().strip())
|
|
return ping_proc.returncode == 0
|
|
|
|
|
|
def get_first_route(remote_host=DEFAULT_REMOTE_HOST):
|
|
""" Get first route used in tracerouting to remote_host"""
|
|
tracert_proc = subprocess.run(
|
|
['traceroute', '-m', '1', remote_host], capture_output=True
|
|
)
|
|
if tracert_proc.stdout:
|
|
logging.info("traceroute stdout:\n%s", tracert_proc.stdout.decode().strip())
|
|
if tracert_proc.stderr:
|
|
logging.info("traceroute stderr:\n%s", tracert_proc.stderr.decode().strip())
|
|
first_route = tracert_proc.stdout.decode().split()[1]
|
|
return first_route
|
|
|
|
|
|
def run_vpn(iface) -> bool:
|
|
logging.info("Removing %s routes before starting VPN client", iface)
|
|
if not delete_iface_routes(iface):
|
|
return False
|
|
|
|
logging.warning("Bringing up VPN interface %s", iface)
|
|
res = subprocess.run(['sh', '/etc/netstart', iface])
|
|
if res.returncode:
|
|
logging.error("Failed to bring up VPN interface %s", iface)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def kill_vpn_client(vpnclient_pid):
|
|
try:
|
|
logging.warning("Killing VPN client process %s", vpnclient_pid)
|
|
os.kill(vpnclient_pid, signal.SIGTERM)
|
|
except ProcessLookupError:
|
|
pass
|
|
time.sleep(5)
|
|
logging.warning("VPN client process %s is killed", vpnclient_pid)
|
|
|
|
|
|
def delete_iface_routes(iface) -> bool:
|
|
logging.info("Fetching route table")
|
|
out = subprocess.check_output(["netstat", "-rn", "-finet"]).decode()
|
|
out = out.splitlines()
|
|
|
|
for line in out:
|
|
if iface in line:
|
|
logging.debug("Processing route line: %s", line)
|
|
route = line.split()[0]
|
|
|
|
logging.warning("Removing route %s for iface %s", route, iface)
|
|
res = subprocess.run(['route', 'delete', route])
|
|
if res.returncode:
|
|
logging.error("Failed to remove route %s for iface %s", route, iface)
|
|
|
|
return True
|
|
|
|
|
|
def get_pid_by_str(search_str):
|
|
""" Search pid by its command line """
|
|
logging.info("Searching process by string: %s", search_str)
|
|
out = subprocess.check_output(['ps', '-A', '-o pid,command'])
|
|
out = out.decode()
|
|
|
|
for line in out.splitlines():
|
|
if "openvpn" not in line:
|
|
continue
|
|
pid, cmd = line.strip().split(' ', 1)
|
|
if search_str in cmd.lower():
|
|
logging.info("Process found: %s", line)
|
|
return int(pid)
|
|
|
|
logging.info("No processes are found: %s", search_str)
|
|
return None
|
|
|
|
|
|
def parse_ovpn_config(path):
|
|
logging.info("Reading ovpn config file: %s", path)
|
|
with open(path) as fp:
|
|
content = str(fp.read())
|
|
|
|
config = dict()
|
|
in_tag = None
|
|
for line in content.splitlines():
|
|
logging.debug("Reading line: %s", line)
|
|
line = line.strip()
|
|
|
|
# skip comments and empty lines
|
|
if line.startswith("#") or not line:
|
|
continue
|
|
|
|
# handle exit tag lines (</some_tag>)
|
|
line_re = TAG_OUT_PATTERN.match(line)
|
|
if line_re:
|
|
in_tag = None
|
|
continue
|
|
|
|
# handle enter tag lines (<some_tag>)
|
|
line_re = TAG_IN_PATTERN.match(line)
|
|
if line_re:
|
|
in_tag = line_re.group(1)
|
|
config[in_tag] = str()
|
|
continue
|
|
|
|
# handle content inside tags
|
|
if in_tag is not None:
|
|
config[in_tag] += line
|
|
continue
|
|
|
|
# handle common lines
|
|
if " " not in line:
|
|
key, value = line, True
|
|
else:
|
|
key, value = line.split(" ", 1)
|
|
config[key] = value
|
|
|
|
logging.info("Config parsed successfully: %s", path)
|
|
return config
|
|
|
|
|
|
def run_vpn_checks(remote_host=DEFAULT_REMOTE_HOST,
|
|
route_prefix='') -> bool:
|
|
""" Run some tests to check VPN connection """
|
|
|
|
logging.info("Checking internet connection")
|
|
if not check_connection(remote_host=remote_host):
|
|
logging.warning("Remote host %s is not available through ICMP", remote_host)
|
|
return False
|
|
|
|
if route_prefix:
|
|
logging.info("Checking default route")
|
|
first_route = get_first_route(remote_host=remote_host)
|
|
if not first_route.startswith(route_prefix):
|
|
logging.warning("Incorrect route to host %s (was %s, should be %s)",
|
|
remote_host, first_route, route_prefix)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Check VPN routes.")
|
|
parser.add_argument("ovpn_file",
|
|
metavar="OVPN_FILE",
|
|
help="path to OpenVPN client config")
|
|
parser.add_argument("-v", "--verbose", action="store_true",
|
|
help="print verbose output")
|
|
parser.add_argument("-d", "--debug", action="store_true",
|
|
help="print debug output")
|
|
parser.add_argument("-p", "--route-prefix",
|
|
help="VPN route prefix (which should exists if connection is fine)")
|
|
parser.add_argument("-r", "--remote-host",
|
|
help="remote host for checking connection",
|
|
default=DEFAULT_REMOTE_HOST)
|
|
args = parser.parse_args()
|
|
|
|
loglevel = logging.WARNING
|
|
if args.verbose:
|
|
loglevel = logging.INFO
|
|
if args.debug:
|
|
loglevel = logging.DEBUG
|
|
logging.basicConfig(
|
|
# format="%(levelname)-5s %(asctime)s %(message)s",
|
|
format="%(asctime)s %(message)s",
|
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
|
level=loglevel
|
|
)
|
|
|
|
logging.info("Starting with args: %s", args)
|
|
|
|
config_name = os.path.basename(args.ovpn_file)
|
|
try:
|
|
config = parse_ovpn_config(args.ovpn_file)
|
|
except FileNotFoundError as err:
|
|
logging.error("%s: %s", err.strerror, args.ovpn_file)
|
|
return err.errno
|
|
|
|
vpn_good = False
|
|
while not vpn_good:
|
|
vpn_client_pid = get_pid_by_str(config_name)
|
|
if not vpn_client_pid:
|
|
logging.warning("VPN client %s is not running", config_name)
|
|
if not run_vpn(config['dev']):
|
|
logging.error("Failed to start VPN client %s", config_name)
|
|
return 1
|
|
|
|
# TODO add real log checks instead of sleep
|
|
logging.info("Wait some time before client starts")
|
|
time.sleep(30)
|
|
|
|
logging.warning("VPN client %s is running", config_name)
|
|
|
|
if run_vpn_checks(remote_host=args.remote_host,
|
|
route_prefix=args.route_prefix):
|
|
vpn_good = True
|
|
logging.warning("VPN connection %s is ok", config_name)
|
|
else:
|
|
logging.warning("VPN connection %s is unstable, need to restart", config_name)
|
|
kill_vpn_client(vpn_client_pid)
|
|
delete_iface_routes(config["dev"])
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|