#!/usr/bin/env python3 import argparse import logging import os import re import signal import subprocess import sys import time DEFAULT_REMOTE_HOST = '4.2.2.2' TAG_IN_PATTERN = re.compile(r"^<(.*)>$") TAG_OUT_PATTERN = re.compile(r"^$") def check_connection(remote_host=DEFAULT_REMOTE_HOST) -> bool: """ Check internet connection by pinging remote_host """ ping_proc = subprocess.run(['ping', '-c', '5', remote_host], capture_output=True) if ping_proc.stdout: logging.info("ping stdout:\n%s", ping_proc.stdout.decode().strip()) if ping_proc.stderr: logging.info("ping stderr:\n%s", ping_proc.stderr.decode().strip()) return ping_proc.returncode == 0 def get_first_route(remote_host=DEFAULT_REMOTE_HOST): """ Get first route used in tracerouting to remote_host""" tracert_proc = subprocess.run( ['traceroute', '-m', '1', remote_host], capture_output=True ) if tracert_proc.stdout: logging.info("traceroute stdout:\n%s", tracert_proc.stdout.decode().strip()) if tracert_proc.stderr: logging.info("traceroute stderr:\n%s", tracert_proc.stderr.decode().strip()) first_route = tracert_proc.stdout.decode().split()[1] return first_route def run_vpn(iface) -> bool: logging.info("Removing %s routes before starting VPN client", iface) if not delete_iface_routes(iface): return False logging.warning("Bringing up VPN interface %s", iface) res = subprocess.run(['sh', '/etc/netstart', iface]) if res.returncode: logging.error("Failed to bring up VPN interface %s", iface) return False return True def kill_vpn_client(vpnclient_pid): try: logging.warning("Killing VPN client process %s", vpnclient_pid) os.kill(vpnclient_pid, signal.SIGTERM) except ProcessLookupError: pass time.sleep(5) logging.warning("VPN client process %s is killed", vpnclient_pid) def delete_iface_routes(iface) -> bool: logging.info("Fetching route table") out = subprocess.check_output(["netstat", "-rn", "-finet"]).decode() out = out.splitlines() for line in out: if iface in line: logging.debug("Processing route line: %s", line) route = line.split()[0] logging.warning("Removing route %s for iface %s", route, iface) res = subprocess.run(['route', 'delete', route]) if res.returncode: logging.error("Failed to remove route %s for iface %s", route, iface) return True def get_pid_by_str(search_str): """ Search pid by its command line """ logging.info("Searching process by string: %s", search_str) out = subprocess.check_output(['ps', '-A', '-o pid,command']) out = out.decode() for line in out.splitlines(): if "openvpn" not in line: continue pid, cmd = line.strip().split(' ', 1) if search_str in cmd.lower(): logging.info("Process found: %s", line) return int(pid) logging.info("No processes are found: %s", search_str) return None def parse_ovpn_config(path): logging.info("Reading ovpn config file: %s", path) with open(path) as fp: content = str(fp.read()) config = dict() in_tag = None for line in content.splitlines(): logging.debug("Reading line: %s", line) line = line.strip() # skip comments and empty lines if line.startswith("#") or not line: continue # handle exit tag lines () line_re = TAG_OUT_PATTERN.match(line) if line_re: in_tag = None continue # handle enter tag lines () line_re = TAG_IN_PATTERN.match(line) if line_re: in_tag = line_re.group(1) config[in_tag] = str() continue # handle content inside tags if in_tag is not None: config[in_tag] += line continue # handle common lines if " " not in line: key, value = line, True else: key, value = line.split(" ", 1) config[key] = value logging.info("Config parsed successfully: %s", path) return config def run_vpn_checks(remote_host=DEFAULT_REMOTE_HOST, route_prefix='') -> bool: """ Run some tests to check VPN connection """ logging.info("Checking internet connection") if not check_connection(remote_host=remote_host): logging.warning("Remote host %s is not available through ICMP", remote_host) return False if route_prefix: logging.info("Checking default route") first_route = get_first_route(remote_host=remote_host) if not first_route.startswith(route_prefix): logging.warning("Incorrect route to host %s (was %s, should be %s)", remote_host, first_route, route_prefix) return False return True def main(): parser = argparse.ArgumentParser(description="Check VPN routes.") parser.add_argument("ovpn_file", metavar="OVPN_FILE", help="path to OpenVPN client config") parser.add_argument("-v", "--verbose", action="store_true", help="print verbose output") parser.add_argument("-d", "--debug", action="store_true", help="print debug output") parser.add_argument("-p", "--route-prefix", help="VPN route prefix (which should exists if connection is fine)") parser.add_argument("-r", "--remote-host", help="remote host for checking connection", default=DEFAULT_REMOTE_HOST) args = parser.parse_args() loglevel = logging.WARNING if args.verbose: loglevel = logging.INFO if args.debug: loglevel = logging.DEBUG logging.basicConfig( # format="%(levelname)-5s %(asctime)s %(message)s", format="%(asctime)s %(message)s", datefmt="%Y-%m-%dT%H:%M:%S", level=loglevel ) logging.info("Starting with args: %s", args) config_name = os.path.basename(args.ovpn_file) try: config = parse_ovpn_config(args.ovpn_file) except FileNotFoundError as err: logging.error("%s: %s", err.strerror, args.ovpn_file) return err.errno vpn_good = False while not vpn_good: vpn_client_pid = get_pid_by_str(config_name) if not vpn_client_pid: logging.warning("VPN client %s is not running", config_name) if not run_vpn(config['dev']): logging.error("Failed to start VPN client %s", config_name) return 1 # TODO add real log checks instead of sleep logging.info("Wait some time before client starts") time.sleep(30) logging.warning("VPN client %s is running", config_name) if run_vpn_checks(remote_host=args.remote_host, route_prefix=args.route_prefix): vpn_good = True logging.warning("VPN connection %s is ok", config_name) else: logging.warning("VPN connection %s is unstable, need to restart", config_name) kill_vpn_client(vpn_client_pid) delete_iface_routes(config["dev"]) return 0 if __name__ == '__main__': sys.exit(main())