cancel
Showing results for 
Search instead for 
Did you mean: 
cancel
205
Views
0
Helpful
0
Replies

Python Script to update Object-Groups on Cisco ASA Firewalls

Bernd Nies
Level 1
Level 1

Hi,

To give something to the community, because it helped me today: I wrote this script to help in recent DDoS attack of our ASA firewalls. It can read list of IP addresses via HTTP URL or from local file and update defined object-groups on multiple ASA firewalls in single or multicontext mode. Runs with Python 3.9.18 on Linux. No warranty.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


CFG_DEFAULTS = {
    "object-groups": {},
    "exclude-networks": [ "0.0.0.0/8", "255.255.255.255/32" ],
    "session-log": "output_{host}.log",
    "defaults": {
        "device-type": "cisco_asa",
        "username": False,
        "password": False,
        "enable-level": 5,
        "enable-secret": False,
        "read-timeout": 30,
        "contexts": []
    },
    "hosts": {}
}


import argparse
import ipaddress
import json
import yaml
import os
import re
import requests
import socket
import sys
import time

from datetime import datetime
from netmiko import ConnectHandler
from pprint import pprint
from urllib.parse import urlparse




# ----------------------------------------------------------------------------
def read_yamlfile(file_path):
    try:
        with open(file_path, 'r') as file:
            data = yaml.safe_load(file)
        return data
    except yaml.YAMLError as e:
        print(f"YAML format error: {e}")
        sys.exit(1) 


# ----------------------------------------------------------------------------
def is_multicontext(conn):
    """Returns true if ASA is in multicontext mode."""
    output = conn.send_command("show mode")
    return re.search(r'Security context mode: multiple', output)


# ----------------------------------------------------------------------------
def get_ips_from_objectgroup(conn, objectgroup):
    """
    Parses ASA network object-group, matches all network-objects matching
    hosts or network addresses. Following ASA network-objects are identical:
    network-object x.x.x.x 255.255.255.255
    network-object host x.x.x.x
    Returns a set (unique list) of IPv4Network objects.
    """
    print("Fetching ... ", end="", flush=True)
    ip_list = set()
    time_start = int(time.time())
    pattern = re.compile(r'network-object\s+(host|\d+\.\d+\.\d+\.\d+)\s+(\d+\.\d+\.\d+\.\d+)')
    output = conn.send_command(f"show running-config object-group id {objectgroup}")
    for line in output.split('\n'):
        if match := pattern.search(line):
            if match.group(1) == "host":
                address = match.group(2)
                netmask = 32
            else:
                address = match.group(1)
                netmask = match.group(2)
            ip_list.add(ipaddress.IPv4Network(f"{address}/{netmask}"))
    time_delta = int(time.time()) - time_start
    print(f"Read in {time_delta}s. Total {len(ip_list)} addresses. ", end="", flush=True)
    return ip_list



# ----------------------------------------------------------------------------
def validate_ipv4(ip_str, exclude_networks):
    """
    Check if IP address given as string (format x.x.x.x for single IP or
    as network in CIDR syntax x.x.x.x/y is a valid IPv4 address, excluding
    networks given as array exclude_networks. Returns an IPv4Network object or
    false otherwise.
    """
    try:
        ip = ipaddress.IPv4Network(ip_str)
        for exclude in exclude_networks:
            if ip.overlaps(ipaddress.IPv4Network(exclude)):
                return False
        return ip
    except:
        return False



# ----------------------------------------------------------------------------
def fetch_addresslist(url, exclude_networks):
    """
    Returns a set (unique list) of IPv4Network objects fetched from given
    URL without addresses matching exclude_networks.
    """
    ip_list = set() 
    pattern = re.compile(r'^\s*(\d+\.\d+\.\d+\.\d+/?\d*)')
    content = ""
    scheme = urlparse(url).scheme

    try:
        if scheme in ["http","https"]:
            response = requests.get(url)
            response.raise_for_status()
            content = response.text
        elif scheme in ["","file"]:
            file_path = urlparse(url).path
            with open(file_path, 'r') as file:
                content = file.read()
        else:
            print(f"Not supported scheme: {scheme}")

        for line in content.splitlines():
            if match := pattern.match(line):
                if ip := validate_ipv4(match.group(1), exclude_networks):
                    ip_list.add(ip)

    except requests.exceptions.ConnectionError:
        print("A connection error occured!")
    except requests.exceptions.Timeout:
        print("The request timed out.")
    except requests.exceptions.HTTPError as e:
        print("HTTP Error:", e)
    except requests.exceptions.RequestException as e:
        print("An error occurred:", e)
    except FileNotFoundError as e:
        print("File not found.", e)
    except PermissionError as e:
        print("Permission denied to access the file.", e)
    except IOError as e:
        print(f"Error reading the file: {e}")
    return ip_list



# ----------------------------------------------------------------------------
def get_asa_network_object(ip):
    """
    Argument is an IPv4Network object. Returns an ASA network-object string
    matching host or network. Returns false if no valid IPv4Network object.
    """
    try:
        address = ip.network_address
        netmask = ip.netmask
        if ip.num_addresses == 1: 
            return f"network-object host {address}" 
        else:
            return f"network-object {address} {netmask}"
    except:
        return False



# ----------------------------------------------------------------------------
def update_objectgroups(conn, objectgroups):
    """
    Find the differences between the object-group on the ASA and the list
    of IPv4Network objects. Then apply only the changes.
    """
    commands = []
    for group in objectgroups:
        print(f"- Object-group {group}: ", end="", flush=True)
        asa_iplist = set(get_ips_from_objectgroup(conn, group))
        new_iplist = set(objectgroups[group]["addresses"])

        ips_to_add = new_iplist - asa_iplist
        ips_to_remove = asa_iplist - new_iplist
        nr_add = len(ips_to_add)
        nr_del = len(ips_to_remove)

        if len(new_iplist) == 0:
            print("Is empty. Skipping.")
        elif nr_add + nr_del == 0:
            print("No changes.")
        else:
            print(f"Adding {nr_add}. removing {nr_del}.")
            description = objectgroups[group]["description"]
            commands.append(f"failover exec active object-group network {group}")
            commands.append(f"failover exec active description {description}")
            for ip in ips_to_add:
                if object := get_asa_network_object(ip):
                    commands.append(f"failover exec active {object}")
            for ip in ips_to_remove:
                if object := get_asa_network_object(ip):
                    commands.append(f"failover exec active no {object}")
            commands.append(f"end") 
            commands.append(f"failover exec active write memory")
    if commands:
        time_start = int(time.time())
        #pprint(commands, width=200)
        conn.send_config_set(commands)
        time_delta = int(time.time()) - time_start
        print(f"- Commands executed in {time_delta}s.")


# ----------------------------------------------------------------------------
def get_logindata(host, cfg):
    """
    Returns a dict with logindata required by Netmiko's ConnectHandler.
    Refer to API documentation at https://github.com/ktbyers/netmiko.
    """
    logindata = {
        "host":                  host,
        "device_type":           cfg["hosts"][host].get("device-type",   cfg["defaults"]["device-type"]),
        "username":              cfg["hosts"][host].get("username",      cfg["defaults"]["username"]),
        "password":              cfg["hosts"][host].get("password",      cfg["defaults"]["password"]),
        "secret":                cfg["hosts"][host].get("enable-secret", cfg["defaults"]["enable-secret"]),
        "read_timeout_override": cfg["hosts"][host].get("read-timeout",  cfg["defaults"]["read-timeout"]),
        "session_log":           cfg["session-log"].format(host=host)
    }
    return logindata


# ----------------------------------------------------------------------------
def read_configfile(file_path):
    """
    Read configuration file in YAML format and set default values if not
    defined. Simplifies further code and error checking. :-)
    Returns a dict with the configuration parameters.
    """
    cfg = read_yamlfile(file_path)
    for key in CFG_DEFAULTS:
        cfg[key] = cfg.get(key, CFG_DEFAULTS[key])
    for key in CFG_DEFAULTS["defaults"]:
        cfg["defaults"][key] = cfg["defaults"].get(key, CFG_DEFAULTS["defaults"][key])
    return cfg


# ----------------------------------------------------------------------------
def fetch_objectgroups(cfg, groups):
    """Fetch IP addresses per object-groups from given URLs."""
    print("Fetching object-groups ...", flush=True)
    objectgroups = {}
    now = datetime.now()
    hostname = socket.gethostname()
    for group in groups:
        source = cfg["object-groups"][group]
        description = "Updated on {} by {} from {}".format(now, hostname, source)
        time_start = int(time.time())
        addresses = fetch_addresslist(source, cfg["exclude-networks"])
        time_delta = int(time.time()) - time_start
        print(f"- {group}: {source}. Read in {time_delta}s. Total {len(addresses)} addresses. ", flush=True)
        objectgroups[group] = {
            "description": description,
            "addresses": addresses
        }
    return objectgroups


# -----------------------------------------------------------------------------
def get_arguments():
    """Read commandline arguments."""
    parser = argparse.ArgumentParser(
        description="Update object-groups on the Cisco firewalls.")
    parser.add_argument(
        '--config', required=True, metavar="FILENAME",
        help="Configuration file in YAML format.")
    parser.add_argument('--hosts', required=False,
        nargs='+', metavar="HOSTNAME",
        help="""Select hosts to be updated as listed in the YAML config file.
        If not used or set to 'all', all configured hosts are updated.""")
    parser.add_argument('--groups', required=False,
        nargs='+', metavar="OBJECTGROUP",
        help="""Select object-groups to be updated as listed in the YAML
        config file. If not used or set to 'all', all configured object-groups
        are updated.""")
    args = parser.parse_args()
    return(args)


# ----------------------------------------------------------------------------
def validate_hosts(cfg, args):
    hosts = []
    if not args.hosts:
        hosts = cfg["hosts"].keys() 
    elif set(args.hosts).issubset(set(cfg["hosts"].keys())):
        hosts = args.hosts
    elif len(args.hosts) == 1 and args.hosts[0] == "all":
        hosts = cfg["hosts"].keys()
    else:
        missing = set(args.hosts) - set(cfg["hosts"].keys())
        sys.exit(f"Host not allowed: {missing}")
    return list(set(hosts))


# ----------------------------------------------------------------------------
def validate_groups(cfg, args):
    groups = []
    if not args.groups:
        groups = cfg["object-groups"].keys()
    elif set(args.groups).issubset(set(cfg["object-groups"].keys())):
        groups = args.groups
    elif len(args.groups) == 1 and args.groups[0] == "all":
        groups = cfg["object-groups"].keys()
    else:
        missing = set(args.groups) - set(cfg["object-groups"].keys())
        sys.exit(f"Object-group not allowed: {missing}")
    return list(set(groups))


# ----------------------------------------------------------------------------
def main():
    args = get_arguments()
    cfg = read_configfile(args.config)
    hosts = validate_hosts(cfg, args) 
    groups = validate_groups(cfg, args) 
    objectgroups = fetch_objectgroups(cfg, groups)

    # Connect to each firewall and apply changes.      
    for host in hosts:
        print(f"\nConnecting to {host} ...") 
        logindata = get_logindata(host, cfg)
        contexts = cfg["hosts"][host].get("contexts", cfg["defaults"]["contexts"])
        with ConnectHandler(**logindata) as conn:
            if is_multicontext(conn) and contexts:
                for context in contexts:
                    print(f"Context {context}:")
                    conn.send_command(f"changeto context {context}")
                    update_objectgroups(conn, objectgroups)
            else:
                update_objectgroups(conn, objectgroups)
            conn.disconnect()


    
if __name__ == "__main__":
    main()

Configuration file in YAML format.

---
# ----------------------------------------------------------------------------
# Cisco ASA FIREWALLS OBJECT-GROUP UPDATE CONFIGURATION
# ----------------------------------------------------------------------------
#
# Use yamllint <thisfile> for syntax checking after editing this file.
#
# Define the object-groups to be updated from a list of IP addresses given by
# an URL. The access-list where these object-groups are being used must
# manually configured on the Cisco ASA firewall.
#
object-groups:
  TOR_Network_Exitpoints: https://check.torproject.org/torbulkexitlist
  VPN_Blacklist: /usr/local/etc/asa_vpn_blacklist.txt


# The networks to be excuded from processing. Most of them are defined by
# RFC 5735 https://datatracker.ietf.org/doc/html/rfc5735.html
# This is to prevent accidental lockout or unexpected behaviour (e. g.
# network range 0.0.0.0/0 is smuggled into the feeded list.
#
exclude-networks:
  - 0.0.0.0/8           # RFC 1122: "This" Network
  - 10.0.0.0/8          # RFC 1918: Private-Use Networks
  - 127.0.0.0/8         # RFC 1122: Loopback
  - 169.254.0.0/16      # RFC 3927: Link Local
  - 172.16.0.0/12       # RFC 1918: Private-Use Networks
  - 192.0.0.0/24        # RFC 5736: IETF Protocol Assignments
  - 192.0.2.0/24        # RFC 5737: TEST-NET-1
  - 192.88.99.0/24      # RFC 3068: 6to4 Relay Anycast
  - 192.168.0.0/16      # RFC 1918: Private-Use Networks
  - 198.18.0.0/15       # RFC 2544: Network Interc. Device Benchmark Testing
  - 198.51.100.0/24     # RFC 5737: TEST-NET-2
  - 203.0.113.0/24      # RFC 5737: TEST-NET-3
  - 224.0.0.0/4         # RFC 3171: Multicast
  - 240.0.0.0/4         # RFC 1112: Reserved for Future Use


# Where to log commands sent via Netmiko over SSH. The {host} is expanded to
# the actual hostname.
#
session-log: /var/log/asa/output_{host}.log


# Defaults to be used for all the hosts defined further below. They can be
# overwritten when needed. The device type is from Python netmiko library
# See https://github.com/ktbyers/netmiko for more. The enable-level setting
# is currently not implemented in netmiko.
#
defaults:
  device-type: cisco_asa
  read-timeout: 300
  username: some-admin
  password: "*****************"
  enable-level: 5


# Cisco ASA firewalls and contexts to be processed. Default parameters can be
# overwritten if needed.
#
hosts:
  firewall-1.example.com:
    enable-secret: "********"
    contexts: [web1, web2]

  firewall-2.example.com:
    enable-secret: "********"
    contexts: [web1, web2]

  firewall-3.example.com:
    enable-secret: "********"
    contexts: [web1, web2]

 

0 Replies 0
Review Cisco Networking products for a $25 gift card