Created
January 24, 2024 02:06
-
-
Save vpatel95/24a7c4d5d777c7280a2a79c40bd85e9c to your computer and use it in GitHub Desktop.
Send different tunneled packets using scapy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import sys | |
import string | |
from scapy.all import * | |
from scapy.contrib.mpls import * | |
from scapy.layers.vxlan import * | |
import argparse | |
import time | |
parser = argparse.ArgumentParser(description="JCNR Scapy tests script", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument("-t", "--tunnel", default="srv6", help="send tunnel type packet") | |
parser.add_argument("-m", "--srcmac", default="52:54:00:9b:2f:42", help="src mac") | |
parser.add_argument("-n", "--dstmac", default="52:54:00:10:15:85", help="dst mac") | |
parser.add_argument("-r", "--rmac", default="00:00:5e:00:01:00", help="router mac") | |
parser.add_argument("-s", "--srcip", default="0.0.0.0", help="tunnel src ip") | |
parser.add_argument("-d", "--dstip", default="0.0.0.0", help="tunnel dst ip") | |
parser.add_argument("-l", "--labels", default=[3201, 28], type=int, | |
nargs='+', help="SR-MPLS label stack/VXLAN VNI") | |
parser.add_argument("-c", "--count", default="1", help="number of pkts to send") | |
parser.add_argument("-i", "--interface", default="enp4s0", | |
help="interface name to be used to send traffic") | |
args = parser.parse_args() | |
config = vars(args) | |
tunnel_type = config["tunnel"] | |
srcip = config["srcip"] | |
dstip = config["dstip"] | |
srcmac = config["srcmac"] | |
dstmac = config["dstmac"] | |
pktcount = int(config["count"]) | |
vif_name = config["interface"] | |
labels = config["labels"] | |
rmac = config["rmac"] | |
print(tunnel_type, vif_name, srcip, dstip, srcmac, dstmac, pktcount, labels, rmac) | |
def srmpls_header(): | |
mpls = MPLS(label=labels[-1], ttl=65, s=1) | |
for label in labels[-2::-1]: | |
mpls = MPLS(label=label, ttl=65, s=0)/mpls | |
return mpls | |
def srv6_header(): | |
return IPv6(src=srcip, dst=dstip, nh=4) | |
def evpn5_header(): | |
ip = IP(src=srcip, dst=dstip, proto=17) | |
udp = UDP(sport=34380, dport=4789) | |
vxlan = VXLAN(vni=labels[0]) | |
eth = Ether(src='3a:8b:18:5d:ba:67', dst=rmac) | |
vxlan_hdr = ip/udp/vxlan/eth | |
return vxlan_hdr | |
def get_tunnel_header(): | |
if tunnel_type == "srv6": | |
return srv6_header() | |
elif tunnel_type == "srmpls": | |
return srmpls_header() | |
elif tunnel_type == "evpn5": | |
return evpn5_header() | |
else: | |
print("Unsupported tunnel type : ", tunnel_type) | |
sys.exit(1) | |
def get_pkt(): | |
eth=Ether(src=srcmac, dst=dstmac) | |
tunnel=get_tunnel_header() | |
ip4=IP(src='20.20.14.11', dst='20.20.24.21', proto=1) | |
icmp=ICMP(type=8) | |
pkt = eth/tunnel/ip4/icmp | |
return pkt | |
def send_pkts(): | |
pkt = get_pkt() | |
print("Sending pkt : ", pkt.show()) | |
i = 0 | |
while i < pktcount: | |
sendp(pkt,iface=vif_name,count=1) | |
time.sleep(1) | |
i += 1 | |
send_pkts() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment