Files
geoip-block/geoip-block.py
2025-06-07 17:30:03 +01:00

95 lines
4.2 KiB
Python

#!/usr/bin/python3
import argparse
from sys import argv
from tempfile import TemporaryDirectory
from requests import get
import logging
from time import sleep
import subprocess
class Main():
def __init__(self, tmpdir, logger):
self.tmpdir = tmpdir
self.logger = logger
self.base_url = 'https://www.ipdeny.com/ipblocks/data/aggregated'
def run_sys_cmd(self, cmd):
out, err = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
if err.decode() != '':
raise SystemError(err.decode())
return out.decode()
def update_iplists(self, countries: list):
success_coutries = []
for c in countries:
cl = c.lower()
self.logger.info('Attempting to get list for %s', cl)
resp = get(f'{self.base_url}/{cl}-aggregated.zone', timeout=30)
if resp.status_code != 200 and resp.status_code != 404:
raise ValueError('Expected 200, got', resp.status_code)
elif resp.status_code == 404:
self.logger.warning('List for %s not found, skipping', cl)
else:
with open(f'{self.tmpdir}/iplist-{cl}.txt', 'a', encoding='utf8') as list_file:
list_file.writelines(resp.text)
success_coutries.append(cl)
return success_coutries
def update_firewalld(self, countries: list):
exist_ipsets = self.run_sys_cmd(['firewall-cmd', '--permanent', '--get-ipsets']).split()
changes = False
for c in countries:
ipset = f'geoip-block-{c}'
self.logger.info('Updating %s', ipset)
if ipset not in exist_ipsets:
self.logger.info('Creating ipset %s', ipset)
self.run_sys_cmd(['firewall-cmd', '--permanent', f'--new-ipset={ipset}', '--type=hash:net'])
currently_blocked = [f'{x.strip()}\n' for x in self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', '--get-entries']).split()]
self.logger.info('ipset contains %s entries', len(currently_blocked))
with open(f'{self.tmpdir}/iplist-{c}.txt', 'r', encoding='utf8') as list_file:
to_block = [f'{x.strip()}\n' for x in list_file.readlines()]
to_add = [x for x in to_block if x not in currently_blocked]
to_rem = [x for x in currently_blocked if x not in to_block]
if len(to_add) + len(to_rem) == 0:
self.logger.info('Already up to date')
continue
changes = True
self.logger.info('Planning to add %s and remove %s entries from %s', len(to_add), len(to_rem), ipset)
with open(f'{self.tmpdir}/add.txt', 'w', encoding='utf8') as add_file:
add_file.writelines(to_add)
with open(f'{self.tmpdir}/rem.txt', 'w', encoding='utf8') as rem_file:
rem_file.writelines(to_rem)
self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', f'--add-entries-from-file={self.tmpdir}/add.txt'])
self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', f'--remove-entries-from-file={self.tmpdir}/rem.txt'])
try:
self.run_sys_cmd(['firewall-cmd', '--permanent', '--zone=drop', f'--add-source=ipset:{ipset}'])
except SystemError:
pass
if changes:
self.logger.info('Reloading FirewallD - This will take a while')
self.run_sys_cmd(['firewall-cmd', '--reload'])
def main(self):
my_name = argv[0]
parser = argparse.ArgumentParser(my_name, description='Tool to apply GeoIP firewall blocking with FirewallD')
parser.add_argument('-c', '--countries', nargs='+', required=True, help='Two letter country codes to block')
args = parser.parse_args()
req_countries = args.countries
self.update_firewalld(self.update_iplists(req_countries))
if __name__ == '__main__':
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG, format="[%(asctime)s] %(levelname)s %(message)s")
with TemporaryDirectory(prefix='geoip-') as tempdir:
Main(tempdir, logger).main()