From 29cba4dd9b236854662b0807ade98f4127ccf4c6 Mon Sep 17 00:00:00 2001 From: Jake Charman Date: Sat, 7 Jun 2025 17:26:45 +0100 Subject: [PATCH] First attempt --- geoip-block.py | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 geoip-block.py diff --git a/geoip-block.py b/geoip-block.py new file mode 100644 index 0000000..5cd45fa --- /dev/null +++ b/geoip-block.py @@ -0,0 +1,94 @@ +#!/usr/bin/python3 + +import argparse +from sys import argv +from tempfile import TemporaryDirectory +from requests import get +import logging +from time import sleep +import subprocess + +class Main(): + def __init__(self, tmpdir, logger): + self.tmpdir = tmpdir + self.logger = logger + self.base_url = 'https://www.ipdeny.com/ipblocks/data/aggregated' + + def run_sys_cmd(self, cmd): + out, err = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() + if err.decode() is not '': + raise SystemError(err.decode()) + return out.decode() + + def update_iplists(self, countries: list): + success_coutries = [] + for c in countries: + cl = c.lower() + self.logger.info('Attempting to get list for %s', cl) + resp = get(f'{self.base_url}/{cl}-aggregated.zone', timeout=30) + if resp.status_code != 200 and resp.status_code != 404: + raise ValueError('Expected 200, got', resp.status_code) + elif resp.status_code == 404: + self.logger.warning('List for %s not found, skipping', cl) + else: + with open(f'{self.tmpdir}/iplist-{cl}.txt', 'a', encoding='utf8') as list_file: + list_file.writelines(resp.text) + success_coutries.append(cl) + return success_coutries + + + def update_firewalld(self, countries: list): + exist_ipsets = self.run_sys_cmd(['firewall-cmd', '--permanent', '--get-ipsets']).split() + changes = False + for c in countries: + ipset = f'geoip-block-{c}' + self.logger.info('Updating %s', ipset) + if ipset not in exist_ipsets: + self.logger.info('Creating ipset %s', ipset) + self.run_sys_cmd(['firewall-cmd', '--permanent', f'--new-ipset={ipset}', '--type=hash:net']) + + currently_blocked = [f'{x.strip()}\n' for x in self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', '--get-entries']).split()] + self.logger.info('ipset contains %s entries', len(currently_blocked)) + with open(f'{self.tmpdir}/iplist-{c}.txt', 'r', encoding='utf8') as list_file: + to_block = [f'{x.strip()}\n' for x in list_file.readlines()] + + to_add = [x for x in to_block if x not in currently_blocked] + to_rem = [x for x in currently_blocked if x not in to_block] + + if len(to_add) + len(to_rem) == 0: + self.logger.info('Already up to date') + continue + + changes = True + + self.logger.info('Planning to add %s and remove %s entries from %s', len(to_add), len(to_rem), ipset) + + with open(f'{self.tmpdir}/add.txt', 'w', encoding='utf8') as add_file: + add_file.writelines(to_add) + with open(f'{self.tmpdir}/rem.txt', 'w', encoding='utf8') as rem_file: + rem_file.writelines(to_rem) + + self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', f'--add-entries-from-file={self.tmpdir}/add.txt']) + self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', f'--remove-entries-from-file={self.tmpdir}/rem.txt']) + try: + self.run_sys_cmd(['firewall-cmd', '--permanent', '--zone=drop', f'--add-source=ipset:{ipset}']) + except SystemError: + pass + if changes: + self.logger.info('Reloading FirewallD - This will take a while') + self.run_sys_cmd(['firewall-cmd', '--reload']) + + + def main(self): + my_name = argv[0] + parser = argparse.ArgumentParser(my_name, description='Tool to apply GeoIP firewall blocking with FirewallD') + parser.add_argument('-c', '--countries', nargs='+', required=True, help='Two letter country codes to block') + args = parser.parse_args() + req_countries = args.countries + self.update_firewalld(self.update_iplists(req_countries)) + +if __name__ == '__main__': + logger = logging.getLogger(__name__) + logging.basicConfig(level=logging.DEBUG, format="[%(asctime)s] %(levelname)s %(message)s") + with TemporaryDirectory(prefix='geoip-') as tempdir: + Main(tempdir, logger).main()