#!/usr/bin/python3 import argparse from sys import argv from tempfile import TemporaryDirectory from requests import get import logging from time import sleep import subprocess class Main(): def __init__(self, tmpdir, logger): self.tmpdir = tmpdir self.logger = logger self.base_url = 'https://www.ipdeny.com/ipblocks/data/aggregated' def run_sys_cmd(self, cmd): out, err = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() if err.decode() is not '': raise SystemError(err.decode()) return out.decode() def update_iplists(self, countries: list): success_coutries = [] for c in countries: cl = c.lower() self.logger.info('Attempting to get list for %s', cl) resp = get(f'{self.base_url}/{cl}-aggregated.zone', timeout=30) if resp.status_code != 200 and resp.status_code != 404: raise ValueError('Expected 200, got', resp.status_code) elif resp.status_code == 404: self.logger.warning('List for %s not found, skipping', cl) else: with open(f'{self.tmpdir}/iplist-{cl}.txt', 'a', encoding='utf8') as list_file: list_file.writelines(resp.text) success_coutries.append(cl) return success_coutries def update_firewalld(self, countries: list): exist_ipsets = self.run_sys_cmd(['firewall-cmd', '--permanent', '--get-ipsets']).split() changes = False for c in countries: ipset = f'geoip-block-{c}' self.logger.info('Updating %s', ipset) if ipset not in exist_ipsets: self.logger.info('Creating ipset %s', ipset) self.run_sys_cmd(['firewall-cmd', '--permanent', f'--new-ipset={ipset}', '--type=hash:net']) currently_blocked = [f'{x.strip()}\n' for x in self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', '--get-entries']).split()] self.logger.info('ipset contains %s entries', len(currently_blocked)) with open(f'{self.tmpdir}/iplist-{c}.txt', 'r', encoding='utf8') as list_file: to_block = [f'{x.strip()}\n' for x in list_file.readlines()] to_add = [x for x in to_block if x not in currently_blocked] to_rem = [x for x in currently_blocked if x not in to_block] if len(to_add) + len(to_rem) == 0: self.logger.info('Already up to date') continue changes = True self.logger.info('Planning to add %s and remove %s entries from %s', len(to_add), len(to_rem), ipset) with open(f'{self.tmpdir}/add.txt', 'w', encoding='utf8') as add_file: add_file.writelines(to_add) with open(f'{self.tmpdir}/rem.txt', 'w', encoding='utf8') as rem_file: rem_file.writelines(to_rem) self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', f'--add-entries-from-file={self.tmpdir}/add.txt']) self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', f'--remove-entries-from-file={self.tmpdir}/rem.txt']) try: self.run_sys_cmd(['firewall-cmd', '--permanent', '--zone=drop', f'--add-source=ipset:{ipset}']) except SystemError: pass if changes: self.logger.info('Reloading FirewallD - This will take a while') self.run_sys_cmd(['firewall-cmd', '--reload']) def main(self): my_name = argv[0] parser = argparse.ArgumentParser(my_name, description='Tool to apply GeoIP firewall blocking with FirewallD') parser.add_argument('-c', '--countries', nargs='+', required=True, help='Two letter country codes to block') args = parser.parse_args() req_countries = args.countries self.update_firewalld(self.update_iplists(req_countries)) if __name__ == '__main__': logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format="[%(asctime)s] %(levelname)s %(message)s") with TemporaryDirectory(prefix='geoip-') as tempdir: Main(tempdir, logger).main()