#!/usr/bin/python import sys import xml.dom.minidom import xml.parsers.expat DIR_ANY = 0 DIR_INCOMING = 1 DIR_OUTGOING = 2 DIR_PASSING = 3 PROTO_ANY = 0 PROTO_TCP = 1 PROTO_UDP = 2 PROTO_ICMP = 3 TARGET_DROP = 0 TARGET_ACCEPT = 1 TARGET_REJECT = 2 TARGET_QUEUE = 3 ruleCount = 0 outputFile = sys.stdout unmatchTarget = TARGET_QUEUE useTcpRst = False icmpRejectCode = 3 icmpTable = ["echo-reply", "pong", "destination-unreachable", "network-unreachable", "host-unreachable", "protocol-unreachable", "port-unreachable", "fragmentation-needed", "source-route-failed", "network-unknown", "host-unknown", "network-prohibited", "host-prohibited", "TOS-network-unreachable", "TOS-host-unreachable", "communication-prohibited", "host-precedence-violation", "precedence-cutoff", "source-quench", "redirect", "network-redirect", "host-redirect", "TOS-network-redirect", "TOS-host-redirect", "echo-request", "ping", "router-advertisement", "router-solicitation", "time-exceeded", "ttl-exceeded", "ttl-zero-during-transit", "ttl-zero-during-reassembly", "parameter-problem", "ip-header-bad", "required-option-missing", "timestamp-request", "timestamp-reply", "address-mask-request", "address-mask-reply"] def icmpRejectCodeValid(c): return c in [0, 1, 2, 3, 9, 10] def icmpRejectCodeToString(c): if c == 0: return "icmp-net-unreachable" elif c == 1: return "icmp-host-unreachable" elif c == 2: return "icmp-proto-unreachable" elif c == 9: return "icmp-net-prohibited" elif c == 10: return "icmp-host-prohibited" return "icmp-port-unreachable" def targetFromString(t): if t == "accept": return TARGET_ACCEPT elif t == "reject": return TARGET_REJECT elif t in ("query", "ask"): return TARGET_QUEUE else: return TARGET_DROP def targetToString(t): if t == TARGET_ACCEPT: return "ACCEPT" elif t == TARGET_REJECT: return "ffuser_reject" elif t == TARGET_QUEUE: return "QUEUE" else: return "DROP" def writeRule(rule): outputFile.write("### Rule %s\n" % rule["description"]) ipt = "" if rule.has_key("protocol"): if rule["protocol"] == PROTO_TCP or rule["protocol"] == PROTO_UDP: if rule["protocol"] == PROTO_UDP: ipt += " -p udp" else: ipt += " -p tcp" if rule.has_key("destination-port"): ipt += " --destination-port %i" % int(rule["destination-port"]) elif rule["protocol"] == PROTO_ICMP: ipt += " -p icmp" if rule.has_key("icmp-type"): if rule["icmp-type"] in icmpTable: ipt += " --icmp-type %s" % rule["icmp-type"] if rule.has_key("source"): ipt += " -s %s" % rule["source"]; if rule.has_key("destination"): ipt += " -d %s" % rule["destination"]; elif rule.has_key("broadcast"): ipt += " -m pkttype --pkt-type broadcast" elif rule.has_key("unicast"): ipt += " -m pkttype --pkt-type unicast" if rule["direction"] != DIR_ANY: if rule["direction"] in (DIR_PASSING, DIR_INCOMING) and rule.has_key("input-device"): ipt += " -i %s" % rule["input-device"] elif rule["direction"] in (DIR_PASSING, DIR_OUTGOING) and rule.has_key("output-device"): ipt += " -o %s" % rule["output-device"] ipt += " -j " + targetToString(rule["target"]) if rule["direction"] in (DIR_ANY, DIR_INCOMING): outputFile.write("iptables -A ffuser_in%s\n" % ipt) if rule["direction"] in (DIR_ANY, DIR_OUTGOING): outputFile.write("iptables -A ffuser_out%s\n" % ipt) if rule["direction"] in (DIR_ANY, DIR_PASSING): outputFile.write("iptables -A ffuser_for%s\n" % ipt) def getText(nodelist): rc = "" for node in nodelist: if node.nodeType == node.TEXT_NODE: rc = rc + node.data return rc def handleRule(rule): global ruleCount, outputFile ruleCount = ruleCount+1 try: id = int(rule.attributes["id"].value) except KeyError: id = 0 if not id: id = ruleCount r = {} r["id"] = id enabledl = rule.getElementsByTagName("disabled") if len(enabledl): return descl = rule.getElementsByTagName("description") if len(descl): r["description"] = "%s (#%i)" % (getText(descl[0].childNodes).strip(), id) else: r["description"] = "#%i" % id r["direction"] = DIR_ANY dir = rule.getElementsByTagName("direction") if len(dir): t = getText(dir[0].childNodes) if t == "incoming": r["direction"] = DIR_INCOMING elif t == "outgoing": r["direction"] = DIR_OUTGOING elif t in ("forwarding", "passing"): r["direction"] = DIR_PASSING proto = rule.getElementsByTagName("protocol") if len(proto): t = getText(proto[0].childNodes) if t == "icmp": r["protocol"] = PROTO_ICMP elif t == "tcp": r["protocol"] = PROTO_TCP elif t == "udp": r["protocol"] = PROTO_UDP r["icmp-type"] = "ping" r["target"] = TARGET_DROP target = rule.getElementsByTagName("target") if len(target): r["target"] = targetFromString(getText(target[0].childNodes)) src = rule.getElementsByTagName("source") if len(src): r["source"] = getText(src[0].childNodes).strip() dst = rule.getElementsByTagName("destination") if len(dst): r["destination"] = getText(dst[0].childNodes).strip() dstp = rule.getElementsByTagName("destination-port") if len(dstp): try: r["destination-port"] = int(getText(dstp[0].childNodes)) except: pass idev = rule.getElementsByTagName("input-device") if len(idev): r["input-device"] = getText(idev[0].childNodes) odev = rule.getElementsByTagName("output-device") if len(odev): r["output-device"] = getText(odev[0].childNodes) bc = rule.getElementsByTagName("broadcast") if len(bc): r["broadcast"] = 1 else: uc = rule.getElementsByTagName("unicast") if len(uc): r["unicast"] = 1 writeRule(r) def handleRuleset(ruleset): global outputFile, useTcpRst, icmpRejectCode, unmatchVerdict outputFile.write("#!/bin/sh\n\n" "### Try to deactivate chains\n" "iptables -D ffsys_in -j ffuser_in &> /dev/null\n" "iptables -D ffsys_out -j ffuser_out &> /dev/null\n" "iptables -D ffsys_for -j ffuser_for &> /dev/null\n\n" "set -e\n\n" "### Create chains\n" "iptables -F ffuser_in 2> /dev/null || iptables -N ffuser_in || exit 1\n" "iptables -F ffuser_out 2> /dev/null || iptables -N ffuser_out || exit 2\n" "iptables -F ffuser_for 2> /dev/null || iptables -N ffuser_for || exit 3\n\n" "### Create reject chain\n" "iptables -F ffuser_reject 2> /dev/null || iptables -N ffuser_reject || exit 4\n") rs = ruleset.getElementsByTagName("ruleset")[0] utr = rs.getElementsByTagName("use-tcp-reject") useTcpRst = len(utr) != 0; icr = rs.getElementsByTagName("icmp-reject-code") if len(icr): icmpRejectCode = int(getText(icr[0].childNodes)) if useTcpRst: outputFile.write("iptables -A ffuser_reject -p tcp -j REJECT --reject-with=tcp-reset\n") if icmpRejectCodeValid(icmpRejectCode): outputFile.write("iptables -A ffuser_reject -j REJECT --reject-with=%s\n" % icmpRejectCodeToString(icmpRejectCode)) else: outputFile.write("iptables -A ffuser_reject -j REJECT\n"); outputFile.write("\n"); ir = rs.getElementsByTagName("ignore-rules") if not len(ir): rules = rs.getElementsByTagName("rule") for r in rules: handleRule(r) uv = rs.getElementsByTagName("unmatch-verdict") if len(uv): unmatchTarget = targetFromString(getText(uv[0].childNodes)) t = targetToString(unmatchTarget) outputFile.write("\n### Set unmatch target \n" "iptables -A ffuser_in -j %s\n" "iptables -A ffuser_out -j %s\n" "iptables -A ffuser_for -j %s\n\n" % (t, t, t)); outputFile.write("### Now activate all rules\n" "iptables -A ffsys_in -j ffuser_in\n" "iptables -A ffsys_out -j ffuser_out\n" "iptables -A ffsys_for -j ffuser_for\n\n" "set +ex\n"); def main(): if len(sys.argv) > 1: f = open(sys.argv[1], "r") else: f = sys.stdin try: dom = xml.dom.minidom.parse(f) handleRuleset(dom) except xml.parsers.expat.ExpatError, e: sys.stderr.write("%s\n" % str(e)) sys.exit(1) main()