Skip to content

Instantly share code, notes, and snippets.

@vpatel95
Created January 24, 2024 02:06
Show Gist options
  • Save vpatel95/24a7c4d5d777c7280a2a79c40bd85e9c to your computer and use it in GitHub Desktop.
Save vpatel95/24a7c4d5d777c7280a2a79c40bd85e9c to your computer and use it in GitHub Desktop.
Send different tunneled packets using scapy
#!/usr/bin/python3
import os
import sys
import string
from scapy.all import *
from scapy.contrib.mpls import *
from scapy.layers.vxlan import *
import argparse
import time
parser = argparse.ArgumentParser(description="JCNR Scapy tests script",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-t", "--tunnel", default="srv6", help="send tunnel type packet")
parser.add_argument("-m", "--srcmac", default="52:54:00:9b:2f:42", help="src mac")
parser.add_argument("-n", "--dstmac", default="52:54:00:10:15:85", help="dst mac")
parser.add_argument("-r", "--rmac", default="00:00:5e:00:01:00", help="router mac")
parser.add_argument("-s", "--srcip", default="0.0.0.0", help="tunnel src ip")
parser.add_argument("-d", "--dstip", default="0.0.0.0", help="tunnel dst ip")
parser.add_argument("-l", "--labels", default=[3201, 28], type=int,
nargs='+', help="SR-MPLS label stack/VXLAN VNI")
parser.add_argument("-c", "--count", default="1", help="number of pkts to send")
parser.add_argument("-i", "--interface", default="enp4s0",
help="interface name to be used to send traffic")
args = parser.parse_args()
config = vars(args)
tunnel_type = config["tunnel"]
srcip = config["srcip"]
dstip = config["dstip"]
srcmac = config["srcmac"]
dstmac = config["dstmac"]
pktcount = int(config["count"])
vif_name = config["interface"]
labels = config["labels"]
rmac = config["rmac"]
print(tunnel_type, vif_name, srcip, dstip, srcmac, dstmac, pktcount, labels, rmac)
def srmpls_header():
mpls = MPLS(label=labels[-1], ttl=65, s=1)
for label in labels[-2::-1]:
mpls = MPLS(label=label, ttl=65, s=0)/mpls
return mpls
def srv6_header():
return IPv6(src=srcip, dst=dstip, nh=4)
def evpn5_header():
ip = IP(src=srcip, dst=dstip, proto=17)
udp = UDP(sport=34380, dport=4789)
vxlan = VXLAN(vni=labels[0])
eth = Ether(src='3a:8b:18:5d:ba:67', dst=rmac)
vxlan_hdr = ip/udp/vxlan/eth
return vxlan_hdr
def get_tunnel_header():
if tunnel_type == "srv6":
return srv6_header()
elif tunnel_type == "srmpls":
return srmpls_header()
elif tunnel_type == "evpn5":
return evpn5_header()
else:
print("Unsupported tunnel type : ", tunnel_type)
sys.exit(1)
def get_pkt():
eth=Ether(src=srcmac, dst=dstmac)
tunnel=get_tunnel_header()
ip4=IP(src='20.20.14.11', dst='20.20.24.21', proto=1)
icmp=ICMP(type=8)
pkt = eth/tunnel/ip4/icmp
return pkt
def send_pkts():
pkt = get_pkt()
print("Sending pkt : ", pkt.show())
i = 0
while i < pktcount:
sendp(pkt,iface=vif_name,count=1)
time.sleep(1)
i += 1
send_pkts()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment