PracticeDev/study_python/tcp_relay-master/tcp_relay.py

1185 lines
40 KiB
Python
Executable File

#!/usr/bin/python
import socket
import struct
import threading
import time
import sys
import random
import re
from optparse import OptionParser
from scapy.all import *
conf.use_pcap=True
import pcappy as pcap
from vlan_config import *
from singletest import *
ETH_P_IP = 0x0800 # Internet Protocol Packet
FIN = 0x01
SYN = 0x02
RST = 0x04
PSH = 0x08
ACK = 0x10
URG = 0x20
ECE = 0x40
CWR = 0x80
Actor_Worker = 1
NEIGH_TO_ROUTER= 0
ROUTER_TO_NEIGH = 1
TCP_MAX_SEQ = 2**32
TCP_TIMESTAMP_MAX = 2**32
CONN_MAX_LIFE_TIME = 180
ACTOR_MAX_LIFE_TIME = 60
ACTOR_NOT_AVAILABLE = 0
deadlock = 60
VLAN_LEN = 4
MAC_LEN = 14
mutex = threading.Lock()
def check_pkt_len(pkt):
ip_len = pkt[IP].len
pkt_len = ip_len + VLAN_LEN + MAC_LEN
recv_len = len(str(pkt))
if(recv_len < pkt_len):
logging.warning("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ received a trunked packet $$$$$$$$$$$$$$$$$$$$$$")
logging.warning("recv_len=%d",recv_len)
logging.warning("pkt_len=%d",pkt_len)
return 0
else:
return 1
def calculate_tcp_length(pkt):
iphdr_len = pkt[IP].ihl * 4
total_len = pkt[IP].len
tcp_hdrlen = pkt[TCP].dataofs * 4
tcp_payload_len = total_len - iphdr_len - tcp_hdrlen
return tcp_payload_len
def reassemble_tcp_packet(pkt,seq,ack,sport,dport,vlanid,tsecr,tsval):
reassemble_pkt = pkt.copy()
reassemble_pkt[Dot1Q].vlan = vlanid
reassemble_pkt[TCP].seq = seq
reassemble_pkt[TCP].ack = ack
reassemble_pkt[TCP].sport = sport
reassemble_pkt[TCP].dport = dport
if tsecr!=None and tsval!=None:
if reassemble_pkt[TCP].options != None:
#logging.info("tcp options:",reassemble_pkt[TCP].options)
listopt = []
for opt in reassemble_pkt[TCP].options:
if opt[0] != 'Timestamp':
listopt.append(opt)
else:
timestamp = ('Timestamp',(tsval,tsecr))
listopt.append(timestamp)
reassemble_pkt[TCP].options = listopt
#logging.info("11111tcp options:",reassemble_pkt[TCP].options)
del reassemble_pkt[TCP].chksum
del reassemble_pkt[IP].chksum
#reassemble_pkt.show()
return reassemble_pkt
def get_vlanid_to_actor(pkt,actorid):
pkt = Ether(str(pkt))
vlanid_neigh = pkt[Dot1Q].vlan
logging.debug("recv neigh pkt with vlanid:%s"%vlanid_neigh)
#get interface map entry
if_map_entry = get_north_vlan_by_south(vlanid_neigh)
s = 'north_actor' + str(actorid) + '_if_vlan'
vlanid_actor = if_map_entry[s]
logging.debug("send pkt to actor with vlanid %s"%vlanid_actor)
return vlanid_actor
def get_vlanid_to_neigh(pkt):
pkt = Ether(str(pkt))
vlanid_actor = pkt[Dot1Q].vlan
#get interface map entry
if_map_entry = get_south_vlan_by_north(vlanid_actor)
vlanid_neigh = if_map_entry['south_neighbor_if_vlan']
return vlanid_neigh
class Actor(object):
def __init__(self, actorid):
self.id = actorid
self.flag = None # worker or inspector
self.syn = 0 # connected or not_connected
self.actor_port = None # socket port of actor
self.neigh_port = None # socket port of neighbor
self.conn = None # actor belonged connection
self.state = None # actor state
self.actor_to_mpp_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # packet sent by actor to mpp [PKT,FLAGS,SEQ,ACK,LEN],should be updated each packet
self.mpp_to_actor_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # packet sent by mpp to actor [PKT,FLAGS,SEQ,ACK,LEN]
def actor_conn_set(self,conn):
self.conn = conn
def store_actor_to_mpp_pkt(self,pkt):
self.actor_to_mpp_pkt["pkt"] = pkt
self.actor_to_mpp_pkt["flags"] = pkt[TCP].flags
self.actor_to_mpp_pkt["seq"] = pkt[TCP].seq
self.actor_to_mpp_pkt["ack"] = pkt[TCP].ack
#self.actor_to_mpp_pkt["len"] = len(pkt[TCP].payload)
self.actor_to_mpp_pkt["len"] = calculate_tcp_length(pkt)
tcp_hdrlen = pkt[TCP].dataofs * 4
if tcp_hdrlen > 20:
tsdata = dict(pkt[TCP].options)
if tsdata != None:
if tsdata.get("Timestamp") != None:
if tsdata["Timestamp"][0] != None:
tsval = tsdata["Timestamp"][0]
try:
self.actor_to_mpp_pkt["tsval"] = int(tsval)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_actor_to_mpp_pkt (int)tsval error")
logging.debug(tsval)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
if tsdata["Timestamp"][1] != None:
tsecr = tsdata["Timestamp"][1]
try:
self.actor_to_mpp_pkt["tsecr"] = int(tsecr)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_actor_to_mpp_pkt (int)tsecr error")
logging.debug(tsecr)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
def store_mpp_to_actor_pkt(self,pkt):
self.mpp_to_actor_pkt["pkt"] = pkt
self.mpp_to_actor_pkt["flags"] = pkt[TCP].flags
self.mpp_to_actor_pkt["seq"] = pkt[TCP].seq
self.mpp_to_actor_pkt["ack"] = pkt[TCP].ack
#self.mpp_to_actor_pkt["len"] = len(pkt[TCP].payload)
self.mpp_to_actor_pkt["len"] = calculate_tcp_length(pkt)
tcp_hdrlen = pkt[TCP].dataofs * 4
if tcp_hdrlen > 20:
tsdata = dict(pkt[TCP].options)
if tsdata != None:
if tsdata.get("Timestamp") != None:
if tsdata["Timestamp"][0] != None:
tsval = tsdata["Timestamp"][0]
try:
self.mpp_to_actor_pkt["tsval"] = int(tsval)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_mpp_to_actor_pkt (int)tsval error")
logging.debug(tsval)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
if tsdata["Timestamp"][1] != None:
tsecr = tsdata["Timestamp"][1]
try:
self.mpp_to_actor_pkt["tsecr"] = int(tsecr)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_mpp_to_actor_pkt (int)tsecr error")
logging.debug(tsecr)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
def actor_syn_set(self):
if self.actor_to_mpp_pkt["flags"]&SYN and self.actor_to_mpp_pkt["flags"]&ACK == 0 :
self.syn = 1
logging.info("actor syn set:%s"%self.id)
def recaculate_mpp_to_actor_pkt_seq_ack(self):
flags = self.mpp_to_actor_pkt.get("flags")
if self.mpp_to_actor_pkt.get("len") == 0:
if flags&SYN or flags&FIN:
seq = self.mpp_to_actor_pkt.get("seq") + 1
else:
seq = self.mpp_to_actor_pkt.get("seq") + self.mpp_to_actor_pkt.get("len")
else:
seq = self.mpp_to_actor_pkt.get("seq") + self.mpp_to_actor_pkt.get("len")
flags = self.actor_to_mpp_pkt.get("flags")
if self.actor_to_mpp_pkt.get("len") == 0:
if flags&SYN or flags&FIN:
ack = self.actor_to_mpp_pkt.get("seq") + 1
else:
ack = self.actor_to_mpp_pkt.get("seq") + self.actor_to_mpp_pkt.get("len")
else:
ack = self.actor_to_mpp_pkt.get("seq") + self.actor_to_mpp_pkt.get("len")
return seq%TCP_MAX_SEQ,ack%TCP_MAX_SEQ
def refill_mpp_to_actor_pkt_timestamp(self,pkt):
tcp_hdrlen = pkt[TCP].dataofs * 4
if tcp_hdrlen <= 20:
return None,None
tsdata = dict(pkt[TCP].options)
if tsdata != None:
if tsdata.get("Timestamp") != None:
tsval = tsdata["Timestamp"][0]
if self.actor_to_mpp_pkt.get("tsval")!= None:
tsecr = self.actor_to_mpp_pkt.get("tsval")
if tsval <= self.actor_to_mpp_pkt.get("tsval"):
logging.info("$$$$$$$$$$$$$$$$$refill_mpp_to_actor_pkt_timestamp,tsval is lower than last tsval")
tsval = self.actor_to_mpp_pkt.get("tsval") + 1
return tsecr%TCP_TIMESTAMP_MAX,tsval%TCP_TIMESTAMP_MAX
else:
return None,tsval%TCP_TIMESTAMP_MAX
else:
return None,None
else:
return None,None
def recv_from_actor(self,pkt):
#logging.info("recv_from_actor")
if self.actor_to_mpp_pkt.get("pkt") != None:
flags = self.actor_to_mpp_pkt.get("flags")
if self.actor_to_mpp_pkt.get("len") == 0:
if flags&SYN or flags&FIN:
if pkt[TCP].seq == self.actor_to_mpp_pkt["seq"] + 1:
logging.info("recv a packet from actor")
self.store_actor_to_mpp_pkt(pkt)
else:
logging.info("recv a actor packet with wrong seq")
return None
else:
if pkt[TCP].seq == self.actor_to_mpp_pkt["seq"]:
logging.info("recv a packet from actor")
self.store_actor_to_mpp_pkt(pkt)
else:
logging.info("recv a actor packet with wrong seq")
return None
else:
if pkt[TCP].seq == self.actor_to_mpp_pkt["seq"]+self.actor_to_mpp_pkt["len"]:
logging.info("recv a packet from actor")
self.store_actor_to_mpp_pkt(pkt)
else:
logging.info("recv a actor packet with wrong seq")
return None
else:
logging.info("recv the FIRST packet from actor")
self.store_actor_to_mpp_pkt(pkt)
self.actor_syn_set()
self.conn.synchronize_actor()
return pkt
def send_to_actor(self,pkt,interface_to_actor):
print "lqs@ 318 i am in send_to_actor"
logging.info("send_to_actor")
seq,ack = self.recaculate_mpp_to_actor_pkt_seq_ack()
try:
tsecr,tsval = self.refill_mpp_to_actor_pkt_timestamp(pkt)
except:
logging.exception("refill_mpp_to_actor_pkt_timestamp error")
logging.exception(str(pkt.show()))
logging.exception(hexdump(pkt))
vlanid = get_vlanid_to_actor(pkt,self.id)
print "vlan id = %d"%(vlanid)
# reassemble the tcp packet
reassembled_pkt = reassemble_tcp_packet(pkt,seq,ack,self.neigh_port,self.actor_port,vlanid,tsecr,tsval)
# store the reassembled pkt for next use
self.store_mpp_to_actor_pkt(reassembled_pkt)
print reassembled_pkt.show()
# send to actor
#logging.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!send_to_actor with len=")
#logging.info(reassembled_pkt[IP].len)
sendp(reassembled_pkt,iface=interface_to_actor,verbose=0)
return
def send_to_neigh(self,pkt,interface_to_neigh):
if self.conn.direction == NEIGH_TO_ROUTER:
if self.flag == "worker":
logging.info("NEIGH_TO_ROUTER, WORKER %%%%%%%%%%%%%%%%%%%%%%%%send to neigh")
self.conn.send_to_neigh(pkt,interface_to_neigh)
return
if self.conn.direction == ROUTER_TO_NEIGH:
if self.conn.syn_actor ==1 and self.conn.actor_synchronized_already == 0:
logging.info("ROUTER_TO_NEIGH, THE FIRST SYN %%%%%%%%%%%%%%%%%%%%%%%%send to neigh")
pkt = self.conn.actor[Actor_Worker].actor_to_mpp_pkt["pkt"]
self.conn.send_to_neigh(pkt,interface_to_neigh)
self.conn.actor_synchronized_already = 1
self.conn.state = 1
return
if self.conn.syn_actor ==1 and self.conn.actor_synchronized_already == 1:
if self.flag == "worker":
logging.info("ROUTER_TO_NEIGH, WORKER %%%%%%%%%%%%%%%%%%%%%%%%send to neigh")
self.conn.send_to_neigh(pkt,interface_to_neigh)
return
'''
if pkt[TCP].flags & RST:
logging.info("Whoever received rst, send it to neighbor")
self.conn.send_to_neigh(pkt,interface_to_neigh)
return
'''
class Connection(object):
def __init__(self, peer_neigh,peer_router):
self.peer_neigh = peer_neigh #(IP1,port1)
self.peer_router = peer_router #(IP2,port2)
self.direction = None # NEIGH_TO_ROUTER,ROUTER_TO_NEIGH
#self.conn_mpp_nei = None # state = established or not-established
#self.conn_mpp_actor = None # state = established or not-established
self.actor = [] # Actor
self.neigh_to_mpp_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # packet received by mpp from neighbor [PKT,FLAGS,SEQ,ACK,LEN]
self.mpp_to_neigh_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # packet send to neigh, should be equal to worker's actor_to_mpp_pkt
self.neigh_to_mpp_fin = 0 # FIN(neigh->mpp) packet received 0/1
self.mpp_to_neigh_fin = 0 # FIN packet
self.neigh_to_mpp_fin_ack = 0 #ACK to FIN(neigh->mpp) packet received 0/1
self.mpp_to_neigh_fin_ack = 0
self.neigh_to_mpp_fin_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # FIN packet received 0/1
self.mpp_to_neigh_fin_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # FIN packet
self.syn_actor = 0
self.syn_neigh = 0
self.actor_synchronized_already = 0
self.time = time.time()
self.state = None
def create_actor(self,actorid):
actor = Actor(actorid)
if actorid == Actor_Worker:
actor.flag = "worker"
self.actor.insert(actorid,actor)
self.actor[actorid].actor_conn_set(self)
self.actor[actorid].state = Actor_List[actorid]["available"]
#logging.info("&&&&&&&&&&actor:", self.actor)
def synchronize_actor(self):
for actor in self.actor:
if actor.state != ACTOR_NOT_AVAILABLE:
if actor.syn == 1:
continue
else:
self.syn_actor = 0
return
self.syn_actor = 1
logging.info("all actor syn set")
return
def neigh_syn_set(self):
if self.neigh_to_mpp_pkt["flags"]&SYN and self.neigh_to_mpp_pkt["flags"]&ACK == 0 :
logging.info("neigh syn set")
self.syn_neigh = 1
def update_conn_time(self):
self.time = time.time()
def store_neigh_to_mpp_pkt(self,pkt):
self.neigh_to_mpp_pkt["pkt"] = pkt
self.neigh_to_mpp_pkt["flags"] = pkt[TCP].flags
self.neigh_to_mpp_pkt["seq"] = pkt[TCP].seq
self.neigh_to_mpp_pkt["ack"] = pkt[TCP].ack
#self.neigh_to_mpp_pkt["len"] = len(pkt[TCP].payload)
self.neigh_to_mpp_pkt["len"] = calculate_tcp_length(pkt)
tcp_hdrlen = pkt[TCP].dataofs * 4
if tcp_hdrlen > 20:
tsdata = dict(pkt[TCP].options)
if tsdata != None:
if tsdata.get("Timestamp") != None:
if tsdata["Timestamp"][0] != None:
tsval = tsdata["Timestamp"][0]
try:
self.neigh_to_mpp_pkt["tsval"] = int(tsval)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_neigh_to_mpp_pkt (int)tsval error")
logging.debug(tsval)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
if tsdata["Timestamp"][1] != None:
tsecr = tsdata["Timestamp"][1]
try:
self.neigh_to_mpp_pkt["tsecr"] = int(tsecr)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_neigh_to_mpp_pkt (int)tsecr error")
logging.debug(tsecr)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
if pkt[TCP].flags & FIN: # neigh->mpp FIN packet
logging.info("FIN PACKET, NEIGH->ACTOR")
self.neigh_to_mpp_fin = 1
self.neigh_to_mpp_fin_pkt["pkt"] = pkt
self.neigh_to_mpp_fin_pkt["flags"] = pkt[TCP].flags
self.neigh_to_mpp_fin_pkt["seq"] = pkt[TCP].seq
self.neigh_to_mpp_fin_pkt["ack"] = pkt[TCP].ack
self.neigh_to_mpp_fin_pkt["len"] = len(pkt[TCP].payload)
tcp_hdrlen = pkt[TCP].dataofs * 4
if tcp_hdrlen > 20:
tsdata = dict(pkt[TCP].options)
if tsdata != None:
if tsdata.get("Timestamp") != None:
if tsdata["Timestamp"][0] != None:
tsval = tsdata["Timestamp"][0]
try:
self.neigh_to_mpp_fin_pkt["tsval"] = int(tsval)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_neigh_to_mpp_pkt (int)tsval error")
logging.debug(tsval)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
if tsdata["Timestamp"][1] != None:
tsecr = tsdata["Timestamp"][1]
try:
self.neigh_to_mpp_fin_pkt["tsecr"] = int(tsecr)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_neigh_to_mpp_pkt (int)tsecr error")
logging.debug(tsecr)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
#logging.info("%%%%%%%%%%%%%%%%%%len=",self.neigh_to_mpp_fin_pkt["len"])
if self.mpp_to_neigh_fin == 1: #if actor send FIN to neigh , judge the ACK(for FIN?)
if pkt[TCP].flags & ACK:
if pkt[TCP].ack == self.mpp_to_neigh_fin_pkt["seq"] +1 :
logging.info("ACK PACKET FOR FIN(actor->neigh)")
self.mpp_to_neigh_fin_ack = 1
def store_mpp_to_neigh_pkt(self,pkt):
self.mpp_to_neigh_pkt["pkt"] = pkt
self.mpp_to_neigh_pkt["flags"] = pkt[TCP].flags
self.mpp_to_neigh_pkt["seq"] = pkt[TCP].seq
self.mpp_to_neigh_pkt["ack"] = pkt[TCP].ack
#self.mpp_to_neigh_pkt["len"] = len(pkt[TCP].payload)
self.mpp_to_neigh_pkt["len"] = calculate_tcp_length(pkt)
tcp_hdrlen = pkt[TCP].dataofs * 4
if tcp_hdrlen > 20:
tsdata = dict(pkt[TCP].options)
if tsdata != None:
if tsdata.get("Timestamp") != None:
if tsdata["Timestamp"][0] != None:
tsval = tsdata["Timestamp"][0]
try:
self.mpp_to_neigh_pkt["tsval"] = int(tsval)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_mpp_to_neigh_pkt (int)tsval error")
logging.debug(tsval)
llogging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
if tsdata["Timestamp"][1] != None:
tsecr = tsdata["Timestamp"][1]
try:
self.mpp_to_neigh_pkt["tsecr"] = int(tsecr)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_mpp_to_neigh_pkt (int)tsecr error")
logging.debug(tsecr)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
if pkt[TCP].flags & FIN: # mpp->neigh FIN packet
logging.info("FIN PACKET, ACTOR->NEIGH")
self.mpp_to_neigh_fin = 1
self.mpp_to_neigh_fin_pkt["pkt"] = pkt
self.mpp_to_neigh_fin_pkt["flags"] = pkt[TCP].flags
self.mpp_to_neigh_fin_pkt["seq"] = pkt[TCP].seq
self.mpp_to_neigh_fin_pkt["ack"] = pkt[TCP].ack
self.mpp_to_neigh_fin_pkt["len"] = len(pkt[TCP].payload)
tcp_hdrlen = pkt[TCP].dataofs * 4
if tcp_hdrlen > 20:
tsdata = dict(pkt[TCP].options)
if tsdata != None:
if tsdata.get("Timestamp") != None:
if tsdata["Timestamp"][0] != None:
tsval = tsdata["Timestamp"][0]
try:
self.mpp_to_neigh_fin_pkt["tsval"] = int(tsval)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_mpp_to_neigh_pkt (int)tsval error")
logging.debug(tsval)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
if tsdata["Timestamp"][1] != None:
tsecr = tsdata["Timestamp"][1]
try:
self.mpp_to_neigh_fin_pkt["tsecr"] = int(tsecr)
except:
logging.warning("***********************************************")
logging.debug(tsdata)
logging.exception("store_mpp_to_neigh_pkt (int)tsecr error")
logging.debug(tsecr)
logging.debug(str(pkt.show()))
logging.debug(hexdump(pkt))
#logging.info("%%%%%%%%%%%%%%%%%%len=",self.mpp_to_neigh_fin_pkt["len"])
if self.neigh_to_mpp_fin == 1: #if neigh send FIN to actor , judge the ACK(for FIN?)
if pkt[TCP].flags & ACK:
if pkt[TCP].ack == self.neigh_to_mpp_fin_pkt["seq"] +1 :
logging.info("ACK PACKET FOR FIN(neigh->actor)")
self.neigh_to_mpp_fin_ack = 1
def recaculate_mpp_to_neigh_pkt_seq_ack(self):
flags = self.mpp_to_neigh_pkt.get("flags")
if self.mpp_to_neigh_pkt.get("len") == 0:
if flags&SYN or flags&FIN:
seq = self.mpp_to_neigh_pkt.get("seq") + 1
else:
seq = self.mpp_to_neigh_pkt.get("seq") + self.mpp_to_neigh_pkt.get("len")
else:
seq = self.mpp_to_neigh_pkt.get("seq") + self.mpp_to_neigh_pkt.get("len")
flags = self.neigh_to_mpp_pkt.get("flags")
#logging.info("neigh_to_mpp_pkt.seq=",self.neigh_to_mpp_pkt.get("seq"))
#logging.info("len=",self.neigh_to_mpp_pkt.get("len"))
#logging.info("flags=",flags)
if self.neigh_to_mpp_pkt.get("len") == 0:
if flags&SYN or flags&FIN:
logging.debug("$$$$$$$$$$$$$$$$$$ flag&SYN=%s, flag&FIN=%s"%(flags&SYN,flags&FIN))
ack = self.neigh_to_mpp_pkt.get("seq") + 1
else:
ack = self.neigh_to_mpp_pkt.get("seq") + self.neigh_to_mpp_pkt.get("len")
else:
ack = self.neigh_to_mpp_pkt.get("seq") + self.neigh_to_mpp_pkt.get("len")
#logging.info("seq,ack=",seq,ack)
return seq%TCP_MAX_SEQ,ack%TCP_MAX_SEQ
def refill_mpp_to_neigh_pkt_timestamp(self,pkt):
tcp_hdrlen = pkt[TCP].dataofs * 4
if tcp_hdrlen <= 20:
return None,None
tsdata = dict(pkt[TCP].options)
if tsdata != None:
if tsdata.get("Timestamp") != None:
tsval = tsdata["Timestamp"][0]
if self.neigh_to_mpp_pkt.get("tsval") != None:
tsecr = self.neigh_to_mpp_pkt.get("tsval")
if tsval <= self.neigh_to_mpp_pkt.get("tsval"):
logging.info("$$$$$$$$$$$$$$$$$refill_mpp_to_neigh_pkt_timestamp,tsval is lower than last tsval")
tsval = self.neigh_to_mpp_pkt.get("tsval") + 1
return tsecr%TCP_TIMESTAMP_MAX,tsval%TCP_TIMESTAMP_MAX
else:
return None,tsval%TCP_TIMESTAMP_MAX
else:
return None,None
else:
return None,None
def recv_from_neigh(self,pkt):
#logging.info("recv_from_neigh")
if self.neigh_to_mpp_pkt.get("pkt") != None:
flags = self.neigh_to_mpp_pkt.get("flags")
if self.neigh_to_mpp_pkt.get("len") == 0:
if flags&SYN or flags&FIN:
if pkt[TCP].seq == self.neigh_to_mpp_pkt["seq"] + 1:
logging.info("recv a packet from neigh")
self.store_neigh_to_mpp_pkt(pkt)
else:
logging.info("recv a neigh packet with wrong seq")
return None
else:
if pkt[TCP].seq == self.neigh_to_mpp_pkt["seq"]:
logging.info("recv a packet from neigh")
self.store_neigh_to_mpp_pkt(pkt)
else:
logging.info("recv a neigh packet with wrong seq")
return None
else:
if pkt[TCP].seq == self.neigh_to_mpp_pkt["seq"]+self.neigh_to_mpp_pkt["len"]:
logging.info("recv a packet from neigh")
self.store_neigh_to_mpp_pkt(pkt)
else:
logging.info("recv a neigh packet with wrong seq")
return None
else:
logging.info("recv the FIRST packet from neigh")
self.store_neigh_to_mpp_pkt(pkt)
self.neigh_syn_set()
return pkt
def send_to_neigh(self,pkt,interface_to_neigh):
#logging.info("send_to_neigh")
if pkt!=None:
seq,ack = self.recaculate_mpp_to_neigh_pkt_seq_ack()
try:
tsecr, tsval= self.refill_mpp_to_neigh_pkt_timestamp(pkt)
except:
logging.exception("refill_mpp_to_neigh_pkt_timestamp tsval error")
logging.exception(str(pkt.show()))
logging.exception(hexdump(pkt))
vlanid = get_vlanid_to_neigh(pkt)
# reassemble the tcp packet
reassembled_pkt = reassemble_tcp_packet(pkt,seq,ack,self.actor[Actor_Worker].actor_port,self.actor[Actor_Worker].neigh_port,vlanid,tsecr,tsval)
# store the reassembled pkt for next use
self.store_mpp_to_neigh_pkt(reassembled_pkt)
# send to neigh
#logging.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!send_to_neigh with len=")
#logging.info(reassembled_pkt[IP].len)
sendp(reassembled_pkt,iface=interface_to_neigh,verbose=0)
return
def send_to_actor(self,pkt,interface_to_actor): #send pkt to four actor
for actor in self.actor:
#if actor.state != ACTOR_NOT_AVAILABLE:
actor.send_to_actor(pkt,interface_to_actor)
class MPP_TCP(object):
def __init__(self,interface_to_neigh,interface_to_actor):
self.interface_to_neigh = interface_to_neigh
self.interface_to_actor = interface_to_actor
self.connection_list = []
def search_conn(self,peer_neigh,peer_router):
global mutex
if mutex.acquire():
for i in range(len(self.connection_list)):
if(self.connection_list[i].peer_neigh == peer_neigh and self.connection_list[i].peer_router == peer_router):
mutex.release()
return self.connection_list[i]
mutex.release()
return None
def search_conn_1(self,peer_neigh,peer_router):
global mutex
if mutex.acquire():
for i in range(len(self.connection_list)):
if self.connection_list[i].peer_neigh == peer_neigh:
if self.connection_list[i].peer_router[0] == peer_router[0] and self.connection_list[i].peer_router[1] != peer_router[1]:
mutex.release()
return self.connection_list[i]
else:
if self.connection_list[i].peer_router == peer_router:
if self.connection_list[i].peer_neigh[0] == peer_neigh[0] and self.connection_list[i].peer_neigh[1] != peer_neigh[1]:
mutex.release()
return self.connection_list[i]
mutex.release()
return None
def search_conn_Actor(self,peer_neigh,peer_router,actorid):
global mutex
if mutex.acquire():
for i in range(len(self.connection_list)):
if(self.connection_list[i].peer_neigh[0] == peer_neigh[0] and self.connection_list[i].peer_router[0] == peer_router[0] and self.connection_list[i].peer_neigh[1] == peer_neigh[1] ):
if self.connection_list[i].actor[actorid].actor_port == peer_router[1]:
mutex.release()
return self.connection_list[i]
mutex.release()
return None
def search_conn_Actor_1(self,peer_neigh,peer_router,actorid):
global mutex
if mutex.acquire():
for i in range(len(self.connection_list)):
if(self.connection_list[i].peer_neigh[0] == peer_neigh[0] and self.connection_list[i].peer_router[0] == peer_router[0] and self.connection_list[i].peer_neigh[1] == peer_neigh[1] ):
if self.connection_list[i].actor[actorid].actor_port != peer_router[1] and self.connection_list[i].actor[actorid].actor_port == None:
mutex.release()
return self.connection_list[i]
mutex.release()
return None
def judge_same_connection_with_opposite_direction(self,peer_neigh,peer_router):
global mutex
if mutex.acquire():
for i in range(len(self.connection_list)):
if(self.connection_list[i].peer_neigh[0] == peer_neigh[0] and self.connection_list[i].peer_router[0] == peer_router[0]): #the same ip address
if(self.connection_list[i].peer_neigh[1] == peer_router[1] or self.connection_list[i].peer_router[1] == peer_neigh[1]):# the opposite tcp dstport
mutex.release()
return self.connection_list[i]
mutex.release()
return None
def del_conn(self,conn):
global mutex
if mutex.acquire():
logging.debug("len of connection_list:%s"%(len(self.connection_list)))
if len(self.connection_list) == 0:
mutex.release()
return
remove_num = 0
logging.debug("connection_list:")
logging.debug(self.connection_list)
for i in range(len(self.connection_list)):
if remove_num:
i=i-remove_num
try:
if self.connection_list[i] == conn:
self.connection_list.remove(conn)
remove_num = remove_num + 1
except IndexError:
logging.info("IndexError")
return 0
logging.debug("after del , connection_list:")
logging.debug(self.connection_list)
mutex.release()
return 1
def update_Actor_List(self,actorid):
now = time.time()
for i in range(len(Actor_List)):
if Actor_List[i]["actorid"] == actorid:
Actor_List[i]["available"] = now
return
def conn_garbage_collection(self):
while True:
logging.info("start to collect the conn garbage")
now = time.time()
if len(self.connection_list) != 0:
remove_num = 0
for i in range(len(self.connection_list)):
if remove_num:
i=i-remove_num
if now - self.connection_list[i].time > CONN_MAX_LIFE_TIME:
remove_num += self.del_conn(self.connection_list[i])
time.sleep(60)
def actor_garbage_collection(self):
while True:
logging.info("start to test actors' state ")
for i in range(len(Actor_List)):
now = time.time()
if Actor_List[i]["available"] != ACTOR_NOT_AVAILABLE:
if now - Actor_List[i]["available"] > ACTOR_MAX_LIFE_TIME:
Actor_List[i]["available"] = ACTOR_NOT_AVAILABLE
for j in range(len(self.connection_list)):
for k in range(len(self.connection_list[j].actor)):
if self.connection_list[j].actor[k].id == i:
self.connection_list[j].actor[k].state = ACTOR_NOT_AVAILABLE
for i in range(len(Actor_List)):
logging.debug( Actor_List[i])
time.sleep(60)
def rst_pkt_process(self,pkt,conn):
logging.info("RST PACKET, DEL THE CONNECTION")
#pkt.show()
self.del_conn(conn)
def fin_pkt_process(self,conn):
if conn.neigh_to_mpp_fin == 1 and conn.neigh_to_mpp_fin_ack == 1 and conn.mpp_to_neigh_fin == 1 and conn.mpp_to_neigh_fin_ack == 1:
logging.info("FIN FINISHED,DEL THE CONNECTION")
self.del_conn(conn)
def prepare_rst_packet(self,pkt):
flags = pkt[TCP].flags
if len(pkt[TCP].payload) == 0:
if flags&SYN or flags&FIN:
rst_pkt = Ether(src=pkt[Ether].dst, dst=pkt[Ether].src)/Dot1Q(vlan=pkt[Dot1Q].vlan)/IP(dst=pkt[IP].src,src=pkt[IP].dst)/TCP(dport=pkt[TCP].sport,sport=pkt[TCP].dport,ack=pkt[TCP].seq + 1,seq=pkt[TCP].ack,flags=RST | ACK)
return rst_pkt
rst_pkt = Ether(src=pkt[Ether].dst, dst=pkt[Ether].src)/Dot1Q(vlan=pkt[Dot1Q].vlan)/IP(dst=pkt[IP].src,src=pkt[IP].dst)/TCP(dport=pkt[TCP].sport,sport=pkt[TCP].dport,ack=pkt[TCP].seq + len(pkt[TCP].payload),seq=pkt[TCP].ack,flags=RST | ACK)
return rst_pkt
def deal_neigh_pkt(self,pkt):
#logging.info("deal_neigh_pkt")
print "i am in deal_neigh_pkt"
#print pkt.show()
pkt_str = str(pkt)
new_pkt = pkt.__class__(pkt_str[16:])
#print new_pkt.show()
pkt_hex = pkt_str[:16].encode('hex')
ftag = int(pkt_hex[26]*16) + int (pkt_hex[27])
pkt = Ether(src=new_pkt[Ether].src,dst=new_pkt[Ether].dst)/Dot1Q(vlan=1000+ftag)/new_pkt.payload
if pkt.haslayer(Ether):
pkt = Ether(str(pkt))
if pkt[Ether].type != 0x8100:
logging.info ("not vlan pkt from neighbor, discard it")
return
if check_pkt_len(pkt) == 0: # trunked packet
logging.warning(str(pkt.show()))
logging.warning(hexdump(pkt))
return
if pkt.haslayer(TCP):
dst = pkt[IP].dst
src = pkt[IP].src
dport = pkt[TCP].dport
sport = pkt[TCP].sport
conn = self.search_conn((src,sport),(dst,dport))
if conn == None: # conn not exist
if pkt[TCP].flags&SYN and pkt[TCP].flags&ACK == 0: #only SYN packet can create connection
logging.info("connection don't exist,recv a syn packet")
logging.info("judge the same direction connection")
old_conn = self.search_conn_1((src,sport),(dst,dport))#judge the same direction connection
if old_conn != None:
logging.info("find the old one and del it")
self.del_conn(old_conn)
logging.info("judge the opposite direction connection")
old_conn = self.judge_same_connection_with_opposite_direction((src,sport),(dst,dport))
print "919 old_conn = %s"%(old_conn)
if old_conn != None:
logging.info("find the opposite direction connection")
#judge the state of the connection
if old_conn.direction == ROUTER_TO_NEIGH:
#if old_conn.state != 1:
logging.info("find the opposite directin connection with state = 0,send reset to actor")
for i in range(len(old_conn.actor)):
if old_conn.actor[i].actor_to_mpp_pkt["pkt"]!=None:
rst_pkt = self.prepare_rst_packet(old_conn.actor[i].actor_to_mpp_pkt["pkt"])
sendp(rst_pkt,iface=self.interface_to_actor,verbose=0)
logging.info("del the conn")
self.del_conn(old_conn)
'''
else:
logging.info("already exist opposite direction connection with state = 1,send reset")
rst_pkt = self.prepare_rst_packet(pkt)
sendp(rst_pkt,iface=self.interface_to_neigh,verbose=0)
return
'''
logging.info("create a new connection")
conn = Connection((src,sport),(dst,dport))
for i in range(len(Actor_List)):
conn.create_actor(Actor_List[i]["actorid"])
conn.actor[Actor_List[i]["actorid"]].actor_port = dport
conn.actor[Actor_List[i]["actorid"]].neigh_port = sport
conn.direction = NEIGH_TO_ROUTER
conn.state = 1
global mutex
if mutex.acquire():
self.connection_list.append(conn)
mutex.release()
else:#other packets, send RST to terminate the connection
logging.info("no conn,not syn,send reset")
rst_pkt = self.prepare_rst_packet(pkt)
sendp(rst_pkt,iface=self.interface_to_neigh,verbose=0)
return
conn.update_conn_time()
if conn.recv_from_neigh(pkt)!=None:
print "lqs@963 send_to_actor"
#print pkt.show()
conn.send_to_actor(pkt,self.interface_to_actor)
else:
return
# process rst and fin packet
if pkt[TCP].flags & RST:
logging.info("neigh send reset")
self.rst_pkt_process(pkt,conn)
self.fin_pkt_process(conn)
else:
logging.info("Warning! Not a TCP packet!")
return
def deal_actor_pkt_deep(self,pkt,actorid):
#logging.info("deal_actor_pkt")
pkt = Ether(str(pkt))
if pkt.haslayer(TCP):
dst = pkt[IP].dst
src = pkt[IP].src
dport = pkt[TCP].dport
sport = pkt[TCP].sport
logging.debug("recv pkt with dst=%s,src=%s,dport=%s,sport=%s"%(dst,src,dport,sport))
if pkt[TCP].flags & RST == 0:
#logging.info("update_Actor_List")
self.update_Actor_List(actorid)
conn = self.search_conn((dst,dport),(src,sport))
if actorid == Actor_Worker and conn == None:
logging.info("worker,conn=None")
if pkt[TCP].flags&SYN and pkt[TCP].flags&ACK == 0 : #only SYN packet can create connection
logging.info("connection don't exist,recv a syn packet")
logging.info("judge the same direction connection")
old_conn = self.search_conn_1((dst,dport),(src,sport))
if old_conn != None:
logging.info("find the old one and del it")
self.del_conn(old_conn)
logging.info("judge the opposite direction connection")
old_conn = self.judge_same_connection_with_opposite_direction((dst,dport),(src,sport))
if old_conn != None:
logging.info("find the opposite direction connection")
#judge the state of the connection
if old_conn.direction == NEIGH_TO_ROUTER:
logging.info("already exist opposite direction connection ,send reset")
rst_pkt = self.prepare_rst_packet(pkt)
sendp(rst_pkt,iface=self.interface_to_actor,verbose=0)
return
logging.info("create a new connection")
conn = Connection((dst,dport),(src,sport))
for i in range(len(Actor_List)):
conn.create_actor(Actor_List[i]["actorid"])
conn.actor[actorid].actor_port = sport
conn.actor[actorid].neigh_port = dport
conn.direction = ROUTER_TO_NEIGH
global mutex
if mutex.acquire():
self.connection_list.append(conn)
mutex.release()
#logging.info ("conn=",conn,"conn.direction ==",conn.direction," conn.peer_neigh[1]=", conn.peer_neigh[1],"dport=",dport,"conn.actor[actorid].actor_port=",conn.actor[actorid].actor_port,"sport = ",sport)
else:
logging.info("worker send packet,no conn,not syn,send reset")
rst_pkt = self.prepare_rst_packet(pkt)
sendp(rst_pkt,iface=self.interface_to_actor,verbose=0)
return
if actorid != Actor_Worker and conn == None:
conn = self.search_conn_Actor((dst,dport),(src,sport),actorid) # search IP peer and actorid.actor_port
if conn == None:
conn = self.search_conn_Actor_1((dst,dport),(src,sport),actorid) #search IP peer and actorid.actor_port = None
if conn == None:
logging.info("inspectior,conn=none,don't create,send rst")
logging.info("inspector send packet,no conn,send reset")
rst_pkt = self.prepare_rst_packet(pkt)
sendp(rst_pkt,iface=self.interface_to_actor,verbose=0)
return
else:
logging.info("the same Ip peer and conn.direction == ROUTER_TO_NEIGH,the actor's neigh_port and actor_port==None,means a new connection created by worker,waiting for the inspector to send syn ")
if pkt[TCP].flags&SYN and pkt[TCP].flags&ACK == 0: #syn packet
conn.actor[actorid].actor_port = sport
conn.actor[actorid].neigh_port = dport
logging.debug ("conn=%s,conn.direction ==%s,conn.peer_neigh=%s,conn.peer_router=%s"%(conn,conn.direction,conn.peer_neigh,conn.peer_router))
#for i in range(len(conn.actor)):
#logging.debug ("conn.actor[%s].actor_port=%s,conn.actor[%s].neigh_port=%s,state=%s"%(i,conn.actor[i].actor_port,i,conn.actor[i].neigh_port,conn.actor[i].state))
conn.update_conn_time()
actor = conn.actor[actorid]
if actor.recv_from_actor(pkt)!=None:
actor.send_to_neigh(pkt,self.interface_to_neigh)
else:
return
# process rst and fin packet
if actorid == Actor_Worker and pkt[TCP].flags & RST:
#if pkt[TCP].flags & RST:
#logging.info("worker send reset")
logging.info("received reset, del the conn")
self.rst_pkt_process(pkt,conn)
return
self.fin_pkt_process(conn)
else:
logging.info("Warning! Not a TCP packet!")
return
def deal_actor_pkt(self,pkt):
print "i am in deal_actor_pkt"
if pkt.haslayer(Ether):
#analyse vlanid to get the actorid
pkt = Ether(str(pkt))
if pkt[Ether].type != 0x8100:
logging.info( 'not vlan pkt from actor, discard it')
return
if check_pkt_len(pkt) == 0: # trunked packet
logging.warning(str(pkt.show()))
logging.warning(hexdump(pkt))
return
vlanid_actor = pkt[Dot1Q].vlan
logging.debug("actor pkt with vlan:%s"%vlanid_actor)
actorid = search_actorid_by_vlanid(vlanid_actor)
if actorid == None:
logging.warning("wrong vlanid, not corresponding to an actor")
return
if actorid > ACTOR_NUMBERS - 1:
logging.warning("actorid is bigger than ACTOR_NUMBERS,don't process it")
return
else:
#logging.info("actor pkt, actorid:",actorid)
self.deal_actor_pkt_deep(pkt,actorid)
def sniff_neth(self):
print "self.interface_to_neigh = %s"%(self.interface_to_neigh)
sniff(prn=self.deal_neigh_pkt,store=0,iface=self.interface_to_neigh,filter='ether[13]>0x00 and ether[13]<0x15 and ether[28]=0x08 and ether[29]=0x00 and ether[39]=0x06')
def sniff_seth(self):
print "self.interface_to_actor = %s"%(self.interface_to_actor)
sniff(prn=self.deal_actor_pkt,store=0,iface=self.interface_to_actor,filter='tcp and ip src 192.168.10.1')
def run(self):
neth_thread = threading.Thread(target = self.sniff_neth)
seth_thread = threading.Thread(target = self.sniff_seth)
garbage_collection_thread = threading.Thread(target = self.conn_garbage_collection)
actor_garbage_collection_thread = threading.Thread(target = self.actor_garbage_collection)
neth_thread.setDaemon(True)
seth_thread.setDaemon(True)
garbage_collection_thread.setDaemon(True)
actor_garbage_collection_thread.setDaemon(True)
neth_thread.start()
seth_thread.start()
garbage_collection_thread.start()
actor_garbage_collection_thread.start()
while True:
time.sleep(deadlock)
if neth_thread.isAlive():
pass
else:
logging.info("neth_thread is down")
neth_thread = threading.Thread(target = self.sniff_neth)
neth_thread.setDaemon(True)
neth_thread.start()
if seth_thread.isAlive():
pass
else:
logging.info("seth_thread is down")
seth_thread = threading.Thread(target = self.sniff_seth)
seth_thread.setDaemon(True)
seth_thread.start()
if garbage_collection_thread.isAlive():
pass
else:
logging.info("garbage_collection_thread is down")
garbage_collection_thread = threading.Thread(target = self.conn_garbage_collection)
garbage_collection_thread.setDaemon(True)
garbage_collection_thread.start()
if actor_garbage_collection_thread.isAlive():
pass
else:
logging.info("actor_garbage_collection_thread is down")
actor_garbage_collection_thread = threading.Thread(target = self.actor_garbage_collection)
actor_garbage_collection_thread.setDaemon(True)
actor_garbage_collection_thread.start()
return
def main():
bRun = single_test()
if bRun == 1:
return
init_from_config()
print "********************"
for i in range(len(Actor_List)):
logging.info( Actor_List[i])
#print_if_vlan_mpp_list()
mpp_tcp = MPP_TCP(MPP_If_List['mpp_south_if_name'],MPP_If_List['mpp_north_if_name'])
mpp_tcp.run()
#mpp_tcp_1 = MPP_TCP("seth2","neth2")
#mpp_tcp_1.run()
if __name__=="__main__":
main()