95 lines
4.2 KiB
Python
95 lines
4.2 KiB
Python
#!/usr/bin/python3
|
|
|
|
import argparse
|
|
from sys import argv
|
|
from tempfile import TemporaryDirectory
|
|
from requests import get
|
|
import logging
|
|
from time import sleep
|
|
import subprocess
|
|
|
|
class Main():
|
|
def __init__(self, tmpdir, logger):
|
|
self.tmpdir = tmpdir
|
|
self.logger = logger
|
|
self.base_url = 'https://www.ipdeny.com/ipblocks/data/aggregated'
|
|
|
|
def run_sys_cmd(self, cmd):
|
|
out, err = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
|
|
if err.decode() != '':
|
|
raise SystemError(err.decode())
|
|
return out.decode()
|
|
|
|
def update_iplists(self, countries: list):
|
|
success_coutries = []
|
|
for c in countries:
|
|
cl = c.lower()
|
|
self.logger.info('Attempting to get list for %s', cl)
|
|
resp = get(f'{self.base_url}/{cl}-aggregated.zone', timeout=30)
|
|
if resp.status_code != 200 and resp.status_code != 404:
|
|
raise ValueError('Expected 200, got', resp.status_code)
|
|
elif resp.status_code == 404:
|
|
self.logger.warning('List for %s not found, skipping', cl)
|
|
else:
|
|
with open(f'{self.tmpdir}/iplist-{cl}.txt', 'a', encoding='utf8') as list_file:
|
|
list_file.writelines(resp.text)
|
|
success_coutries.append(cl)
|
|
return success_coutries
|
|
|
|
|
|
def update_firewalld(self, countries: list):
|
|
exist_ipsets = self.run_sys_cmd(['firewall-cmd', '--permanent', '--get-ipsets']).split()
|
|
changes = False
|
|
for c in countries:
|
|
ipset = f'geoip-block-{c}'
|
|
self.logger.info('Updating %s', ipset)
|
|
if ipset not in exist_ipsets:
|
|
self.logger.info('Creating ipset %s', ipset)
|
|
self.run_sys_cmd(['firewall-cmd', '--permanent', f'--new-ipset={ipset}', '--type=hash:net'])
|
|
|
|
currently_blocked = [f'{x.strip()}\n' for x in self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', '--get-entries']).split()]
|
|
self.logger.info('ipset contains %s entries', len(currently_blocked))
|
|
with open(f'{self.tmpdir}/iplist-{c}.txt', 'r', encoding='utf8') as list_file:
|
|
to_block = [f'{x.strip()}\n' for x in list_file.readlines()]
|
|
|
|
to_add = [x for x in to_block if x not in currently_blocked]
|
|
to_rem = [x for x in currently_blocked if x not in to_block]
|
|
|
|
if len(to_add) + len(to_rem) == 0:
|
|
self.logger.info('Already up to date')
|
|
continue
|
|
|
|
changes = True
|
|
|
|
self.logger.info('Planning to add %s and remove %s entries from %s', len(to_add), len(to_rem), ipset)
|
|
|
|
with open(f'{self.tmpdir}/add.txt', 'w', encoding='utf8') as add_file:
|
|
add_file.writelines(to_add)
|
|
with open(f'{self.tmpdir}/rem.txt', 'w', encoding='utf8') as rem_file:
|
|
rem_file.writelines(to_rem)
|
|
|
|
self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', f'--add-entries-from-file={self.tmpdir}/add.txt'])
|
|
self.run_sys_cmd(['firewall-cmd', '--permanent', f'--ipset={ipset}', f'--remove-entries-from-file={self.tmpdir}/rem.txt'])
|
|
try:
|
|
self.run_sys_cmd(['firewall-cmd', '--permanent', '--zone=drop', f'--add-source=ipset:{ipset}'])
|
|
except SystemError:
|
|
pass
|
|
if changes:
|
|
self.logger.info('Reloading FirewallD - This will take a while')
|
|
self.run_sys_cmd(['firewall-cmd', '--reload'])
|
|
|
|
|
|
def main(self):
|
|
my_name = argv[0]
|
|
parser = argparse.ArgumentParser(my_name, description='Tool to apply GeoIP firewall blocking with FirewallD')
|
|
parser.add_argument('-c', '--countries', nargs='+', required=True, help='Two letter country codes to block')
|
|
args = parser.parse_args()
|
|
req_countries = args.countries
|
|
self.update_firewalld(self.update_iplists(req_countries))
|
|
|
|
if __name__ == '__main__':
|
|
logger = logging.getLogger(__name__)
|
|
logging.basicConfig(level=logging.DEBUG, format="[%(asctime)s] %(levelname)s %(message)s")
|
|
with TemporaryDirectory(prefix='geoip-') as tempdir:
|
|
Main(tempdir, logger).main()
|