#!/usr/bin/python import socket import struct import threading import time import sys import random import re from optparse import OptionParser from scapy.all import * conf.use_pcap=True import pcappy as pcap from vlan_config import * from singletest import * ETH_P_IP = 0x0800 # Internet Protocol Packet FIN = 0x01 SYN = 0x02 RST = 0x04 PSH = 0x08 ACK = 0x10 URG = 0x20 ECE = 0x40 CWR = 0x80 Actor_Worker = 1 NEIGH_TO_ROUTER= 0 ROUTER_TO_NEIGH = 1 TCP_MAX_SEQ = 2**32 TCP_TIMESTAMP_MAX = 2**32 CONN_MAX_LIFE_TIME = 180 ACTOR_MAX_LIFE_TIME = 60 ACTOR_NOT_AVAILABLE = 0 deadlock = 60 VLAN_LEN = 4 MAC_LEN = 14 mutex = threading.Lock() def check_pkt_len(pkt): ip_len = pkt[IP].len pkt_len = ip_len + VLAN_LEN + MAC_LEN recv_len = len(str(pkt)) if(recv_len < pkt_len): logging.warning("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ received a trunked packet $$$$$$$$$$$$$$$$$$$$$$") logging.warning("recv_len=%d",recv_len) logging.warning("pkt_len=%d",pkt_len) return 0 else: return 1 def calculate_tcp_length(pkt): iphdr_len = pkt[IP].ihl * 4 total_len = pkt[IP].len tcp_hdrlen = pkt[TCP].dataofs * 4 tcp_payload_len = total_len - iphdr_len - tcp_hdrlen return tcp_payload_len def reassemble_tcp_packet(pkt,seq,ack,sport,dport,vlanid,tsecr,tsval): reassemble_pkt = pkt.copy() reassemble_pkt[Dot1Q].vlan = vlanid reassemble_pkt[TCP].seq = seq reassemble_pkt[TCP].ack = ack reassemble_pkt[TCP].sport = sport reassemble_pkt[TCP].dport = dport if tsecr!=None and tsval!=None: if reassemble_pkt[TCP].options != None: #logging.info("tcp options:",reassemble_pkt[TCP].options) listopt = [] for opt in reassemble_pkt[TCP].options: if opt[0] != 'Timestamp': listopt.append(opt) else: timestamp = ('Timestamp',(tsval,tsecr)) listopt.append(timestamp) reassemble_pkt[TCP].options = listopt #logging.info("11111tcp options:",reassemble_pkt[TCP].options) del reassemble_pkt[TCP].chksum del reassemble_pkt[IP].chksum #reassemble_pkt.show() return reassemble_pkt def get_vlanid_to_actor(pkt,actorid): pkt = Ether(str(pkt)) vlanid_neigh = pkt[Dot1Q].vlan logging.debug("recv neigh pkt with vlanid:%s"%vlanid_neigh) #get interface map entry if_map_entry = get_north_vlan_by_south(vlanid_neigh) s = 'north_actor' + str(actorid) + '_if_vlan' vlanid_actor = if_map_entry[s] logging.debug("send pkt to actor with vlanid %s"%vlanid_actor) return vlanid_actor def get_vlanid_to_neigh(pkt): pkt = Ether(str(pkt)) vlanid_actor = pkt[Dot1Q].vlan #get interface map entry if_map_entry = get_south_vlan_by_north(vlanid_actor) vlanid_neigh = if_map_entry['south_neighbor_if_vlan'] return vlanid_neigh class Actor(object): def __init__(self, actorid): self.id = actorid self.flag = None # worker or inspector self.syn = 0 # connected or not_connected self.actor_port = None # socket port of actor self.neigh_port = None # socket port of neighbor self.conn = None # actor belonged connection self.state = None # actor state self.actor_to_mpp_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # packet sent by actor to mpp [PKT,FLAGS,SEQ,ACK,LEN],should be updated each packet self.mpp_to_actor_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # packet sent by mpp to actor [PKT,FLAGS,SEQ,ACK,LEN] def actor_conn_set(self,conn): self.conn = conn def store_actor_to_mpp_pkt(self,pkt): self.actor_to_mpp_pkt["pkt"] = pkt self.actor_to_mpp_pkt["flags"] = pkt[TCP].flags self.actor_to_mpp_pkt["seq"] = pkt[TCP].seq self.actor_to_mpp_pkt["ack"] = pkt[TCP].ack #self.actor_to_mpp_pkt["len"] = len(pkt[TCP].payload) self.actor_to_mpp_pkt["len"] = calculate_tcp_length(pkt) tcp_hdrlen = pkt[TCP].dataofs * 4 if tcp_hdrlen > 20: tsdata = dict(pkt[TCP].options) if tsdata != None: if tsdata.get("Timestamp") != None: if tsdata["Timestamp"][0] != None: tsval = tsdata["Timestamp"][0] try: self.actor_to_mpp_pkt["tsval"] = int(tsval) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_actor_to_mpp_pkt (int)tsval error") logging.debug(tsval) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) if tsdata["Timestamp"][1] != None: tsecr = tsdata["Timestamp"][1] try: self.actor_to_mpp_pkt["tsecr"] = int(tsecr) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_actor_to_mpp_pkt (int)tsecr error") logging.debug(tsecr) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) def store_mpp_to_actor_pkt(self,pkt): self.mpp_to_actor_pkt["pkt"] = pkt self.mpp_to_actor_pkt["flags"] = pkt[TCP].flags self.mpp_to_actor_pkt["seq"] = pkt[TCP].seq self.mpp_to_actor_pkt["ack"] = pkt[TCP].ack #self.mpp_to_actor_pkt["len"] = len(pkt[TCP].payload) self.mpp_to_actor_pkt["len"] = calculate_tcp_length(pkt) tcp_hdrlen = pkt[TCP].dataofs * 4 if tcp_hdrlen > 20: tsdata = dict(pkt[TCP].options) if tsdata != None: if tsdata.get("Timestamp") != None: if tsdata["Timestamp"][0] != None: tsval = tsdata["Timestamp"][0] try: self.mpp_to_actor_pkt["tsval"] = int(tsval) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_mpp_to_actor_pkt (int)tsval error") logging.debug(tsval) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) if tsdata["Timestamp"][1] != None: tsecr = tsdata["Timestamp"][1] try: self.mpp_to_actor_pkt["tsecr"] = int(tsecr) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_mpp_to_actor_pkt (int)tsecr error") logging.debug(tsecr) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) def actor_syn_set(self): if self.actor_to_mpp_pkt["flags"]&SYN and self.actor_to_mpp_pkt["flags"]&ACK == 0 : self.syn = 1 logging.info("actor syn set:%s"%self.id) def recaculate_mpp_to_actor_pkt_seq_ack(self): flags = self.mpp_to_actor_pkt.get("flags") if self.mpp_to_actor_pkt.get("len") == 0: if flags&SYN or flags&FIN: seq = self.mpp_to_actor_pkt.get("seq") + 1 else: seq = self.mpp_to_actor_pkt.get("seq") + self.mpp_to_actor_pkt.get("len") else: seq = self.mpp_to_actor_pkt.get("seq") + self.mpp_to_actor_pkt.get("len") flags = self.actor_to_mpp_pkt.get("flags") if self.actor_to_mpp_pkt.get("len") == 0: if flags&SYN or flags&FIN: ack = self.actor_to_mpp_pkt.get("seq") + 1 else: ack = self.actor_to_mpp_pkt.get("seq") + self.actor_to_mpp_pkt.get("len") else: ack = self.actor_to_mpp_pkt.get("seq") + self.actor_to_mpp_pkt.get("len") return seq%TCP_MAX_SEQ,ack%TCP_MAX_SEQ def refill_mpp_to_actor_pkt_timestamp(self,pkt): tcp_hdrlen = pkt[TCP].dataofs * 4 if tcp_hdrlen <= 20: return None,None tsdata = dict(pkt[TCP].options) if tsdata != None: if tsdata.get("Timestamp") != None: tsval = tsdata["Timestamp"][0] if self.actor_to_mpp_pkt.get("tsval")!= None: tsecr = self.actor_to_mpp_pkt.get("tsval") if tsval <= self.actor_to_mpp_pkt.get("tsval"): logging.info("$$$$$$$$$$$$$$$$$refill_mpp_to_actor_pkt_timestamp,tsval is lower than last tsval") tsval = self.actor_to_mpp_pkt.get("tsval") + 1 return tsecr%TCP_TIMESTAMP_MAX,tsval%TCP_TIMESTAMP_MAX else: return None,tsval%TCP_TIMESTAMP_MAX else: return None,None else: return None,None def recv_from_actor(self,pkt): #logging.info("recv_from_actor") if self.actor_to_mpp_pkt.get("pkt") != None: flags = self.actor_to_mpp_pkt.get("flags") if self.actor_to_mpp_pkt.get("len") == 0: if flags&SYN or flags&FIN: if pkt[TCP].seq == self.actor_to_mpp_pkt["seq"] + 1: logging.info("recv a packet from actor") self.store_actor_to_mpp_pkt(pkt) else: logging.info("recv a actor packet with wrong seq") return None else: if pkt[TCP].seq == self.actor_to_mpp_pkt["seq"]: logging.info("recv a packet from actor") self.store_actor_to_mpp_pkt(pkt) else: logging.info("recv a actor packet with wrong seq") return None else: if pkt[TCP].seq == self.actor_to_mpp_pkt["seq"]+self.actor_to_mpp_pkt["len"]: logging.info("recv a packet from actor") self.store_actor_to_mpp_pkt(pkt) else: logging.info("recv a actor packet with wrong seq") return None else: logging.info("recv the FIRST packet from actor") self.store_actor_to_mpp_pkt(pkt) self.actor_syn_set() self.conn.synchronize_actor() return pkt def send_to_actor(self,pkt,interface_to_actor): print "lqs@ 318 i am in send_to_actor" logging.info("send_to_actor") seq,ack = self.recaculate_mpp_to_actor_pkt_seq_ack() try: tsecr,tsval = self.refill_mpp_to_actor_pkt_timestamp(pkt) except: logging.exception("refill_mpp_to_actor_pkt_timestamp error") logging.exception(str(pkt.show())) logging.exception(hexdump(pkt)) vlanid = get_vlanid_to_actor(pkt,self.id) print "vlan id = %d"%(vlanid) # reassemble the tcp packet reassembled_pkt = reassemble_tcp_packet(pkt,seq,ack,self.neigh_port,self.actor_port,vlanid,tsecr,tsval) # store the reassembled pkt for next use self.store_mpp_to_actor_pkt(reassembled_pkt) print reassembled_pkt.show() # send to actor #logging.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!send_to_actor with len=") #logging.info(reassembled_pkt[IP].len) sendp(reassembled_pkt,iface=interface_to_actor,verbose=0) return def send_to_neigh(self,pkt,interface_to_neigh): if self.conn.direction == NEIGH_TO_ROUTER: if self.flag == "worker": logging.info("NEIGH_TO_ROUTER, WORKER %%%%%%%%%%%%%%%%%%%%%%%%send to neigh") self.conn.send_to_neigh(pkt,interface_to_neigh) return if self.conn.direction == ROUTER_TO_NEIGH: if self.conn.syn_actor ==1 and self.conn.actor_synchronized_already == 0: logging.info("ROUTER_TO_NEIGH, THE FIRST SYN %%%%%%%%%%%%%%%%%%%%%%%%send to neigh") pkt = self.conn.actor[Actor_Worker].actor_to_mpp_pkt["pkt"] self.conn.send_to_neigh(pkt,interface_to_neigh) self.conn.actor_synchronized_already = 1 self.conn.state = 1 return if self.conn.syn_actor ==1 and self.conn.actor_synchronized_already == 1: if self.flag == "worker": logging.info("ROUTER_TO_NEIGH, WORKER %%%%%%%%%%%%%%%%%%%%%%%%send to neigh") self.conn.send_to_neigh(pkt,interface_to_neigh) return ''' if pkt[TCP].flags & RST: logging.info("Whoever received rst, send it to neighbor") self.conn.send_to_neigh(pkt,interface_to_neigh) return ''' class Connection(object): def __init__(self, peer_neigh,peer_router): self.peer_neigh = peer_neigh #(IP1,port1) self.peer_router = peer_router #(IP2,port2) self.direction = None # NEIGH_TO_ROUTER,ROUTER_TO_NEIGH #self.conn_mpp_nei = None # state = established or not-established #self.conn_mpp_actor = None # state = established or not-established self.actor = [] # Actor self.neigh_to_mpp_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # packet received by mpp from neighbor [PKT,FLAGS,SEQ,ACK,LEN] self.mpp_to_neigh_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # packet send to neigh, should be equal to worker's actor_to_mpp_pkt self.neigh_to_mpp_fin = 0 # FIN(neigh->mpp) packet received 0/1 self.mpp_to_neigh_fin = 0 # FIN packet self.neigh_to_mpp_fin_ack = 0 #ACK to FIN(neigh->mpp) packet received 0/1 self.mpp_to_neigh_fin_ack = 0 self.neigh_to_mpp_fin_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # FIN packet received 0/1 self.mpp_to_neigh_fin_pkt = {"pkt":None,"flags":0,"seq":0,"ack":0,"len":0,"tsval":0,"tsecr":0} # FIN packet self.syn_actor = 0 self.syn_neigh = 0 self.actor_synchronized_already = 0 self.time = time.time() self.state = None def create_actor(self,actorid): actor = Actor(actorid) if actorid == Actor_Worker: actor.flag = "worker" self.actor.insert(actorid,actor) self.actor[actorid].actor_conn_set(self) self.actor[actorid].state = Actor_List[actorid]["available"] #logging.info("&&&&&&&&&&actor:", self.actor) def synchronize_actor(self): for actor in self.actor: if actor.state != ACTOR_NOT_AVAILABLE: if actor.syn == 1: continue else: self.syn_actor = 0 return self.syn_actor = 1 logging.info("all actor syn set") return def neigh_syn_set(self): if self.neigh_to_mpp_pkt["flags"]&SYN and self.neigh_to_mpp_pkt["flags"]&ACK == 0 : logging.info("neigh syn set") self.syn_neigh = 1 def update_conn_time(self): self.time = time.time() def store_neigh_to_mpp_pkt(self,pkt): self.neigh_to_mpp_pkt["pkt"] = pkt self.neigh_to_mpp_pkt["flags"] = pkt[TCP].flags self.neigh_to_mpp_pkt["seq"] = pkt[TCP].seq self.neigh_to_mpp_pkt["ack"] = pkt[TCP].ack #self.neigh_to_mpp_pkt["len"] = len(pkt[TCP].payload) self.neigh_to_mpp_pkt["len"] = calculate_tcp_length(pkt) tcp_hdrlen = pkt[TCP].dataofs * 4 if tcp_hdrlen > 20: tsdata = dict(pkt[TCP].options) if tsdata != None: if tsdata.get("Timestamp") != None: if tsdata["Timestamp"][0] != None: tsval = tsdata["Timestamp"][0] try: self.neigh_to_mpp_pkt["tsval"] = int(tsval) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_neigh_to_mpp_pkt (int)tsval error") logging.debug(tsval) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) if tsdata["Timestamp"][1] != None: tsecr = tsdata["Timestamp"][1] try: self.neigh_to_mpp_pkt["tsecr"] = int(tsecr) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_neigh_to_mpp_pkt (int)tsecr error") logging.debug(tsecr) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) if pkt[TCP].flags & FIN: # neigh->mpp FIN packet logging.info("FIN PACKET, NEIGH->ACTOR") self.neigh_to_mpp_fin = 1 self.neigh_to_mpp_fin_pkt["pkt"] = pkt self.neigh_to_mpp_fin_pkt["flags"] = pkt[TCP].flags self.neigh_to_mpp_fin_pkt["seq"] = pkt[TCP].seq self.neigh_to_mpp_fin_pkt["ack"] = pkt[TCP].ack self.neigh_to_mpp_fin_pkt["len"] = len(pkt[TCP].payload) tcp_hdrlen = pkt[TCP].dataofs * 4 if tcp_hdrlen > 20: tsdata = dict(pkt[TCP].options) if tsdata != None: if tsdata.get("Timestamp") != None: if tsdata["Timestamp"][0] != None: tsval = tsdata["Timestamp"][0] try: self.neigh_to_mpp_fin_pkt["tsval"] = int(tsval) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_neigh_to_mpp_pkt (int)tsval error") logging.debug(tsval) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) if tsdata["Timestamp"][1] != None: tsecr = tsdata["Timestamp"][1] try: self.neigh_to_mpp_fin_pkt["tsecr"] = int(tsecr) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_neigh_to_mpp_pkt (int)tsecr error") logging.debug(tsecr) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) #logging.info("%%%%%%%%%%%%%%%%%%len=",self.neigh_to_mpp_fin_pkt["len"]) if self.mpp_to_neigh_fin == 1: #if actor send FIN to neigh , judge the ACK(for FIN?) if pkt[TCP].flags & ACK: if pkt[TCP].ack == self.mpp_to_neigh_fin_pkt["seq"] +1 : logging.info("ACK PACKET FOR FIN(actor->neigh)") self.mpp_to_neigh_fin_ack = 1 def store_mpp_to_neigh_pkt(self,pkt): self.mpp_to_neigh_pkt["pkt"] = pkt self.mpp_to_neigh_pkt["flags"] = pkt[TCP].flags self.mpp_to_neigh_pkt["seq"] = pkt[TCP].seq self.mpp_to_neigh_pkt["ack"] = pkt[TCP].ack #self.mpp_to_neigh_pkt["len"] = len(pkt[TCP].payload) self.mpp_to_neigh_pkt["len"] = calculate_tcp_length(pkt) tcp_hdrlen = pkt[TCP].dataofs * 4 if tcp_hdrlen > 20: tsdata = dict(pkt[TCP].options) if tsdata != None: if tsdata.get("Timestamp") != None: if tsdata["Timestamp"][0] != None: tsval = tsdata["Timestamp"][0] try: self.mpp_to_neigh_pkt["tsval"] = int(tsval) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_mpp_to_neigh_pkt (int)tsval error") logging.debug(tsval) llogging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) if tsdata["Timestamp"][1] != None: tsecr = tsdata["Timestamp"][1] try: self.mpp_to_neigh_pkt["tsecr"] = int(tsecr) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_mpp_to_neigh_pkt (int)tsecr error") logging.debug(tsecr) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) if pkt[TCP].flags & FIN: # mpp->neigh FIN packet logging.info("FIN PACKET, ACTOR->NEIGH") self.mpp_to_neigh_fin = 1 self.mpp_to_neigh_fin_pkt["pkt"] = pkt self.mpp_to_neigh_fin_pkt["flags"] = pkt[TCP].flags self.mpp_to_neigh_fin_pkt["seq"] = pkt[TCP].seq self.mpp_to_neigh_fin_pkt["ack"] = pkt[TCP].ack self.mpp_to_neigh_fin_pkt["len"] = len(pkt[TCP].payload) tcp_hdrlen = pkt[TCP].dataofs * 4 if tcp_hdrlen > 20: tsdata = dict(pkt[TCP].options) if tsdata != None: if tsdata.get("Timestamp") != None: if tsdata["Timestamp"][0] != None: tsval = tsdata["Timestamp"][0] try: self.mpp_to_neigh_fin_pkt["tsval"] = int(tsval) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_mpp_to_neigh_pkt (int)tsval error") logging.debug(tsval) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) if tsdata["Timestamp"][1] != None: tsecr = tsdata["Timestamp"][1] try: self.mpp_to_neigh_fin_pkt["tsecr"] = int(tsecr) except: logging.warning("***********************************************") logging.debug(tsdata) logging.exception("store_mpp_to_neigh_pkt (int)tsecr error") logging.debug(tsecr) logging.debug(str(pkt.show())) logging.debug(hexdump(pkt)) #logging.info("%%%%%%%%%%%%%%%%%%len=",self.mpp_to_neigh_fin_pkt["len"]) if self.neigh_to_mpp_fin == 1: #if neigh send FIN to actor , judge the ACK(for FIN?) if pkt[TCP].flags & ACK: if pkt[TCP].ack == self.neigh_to_mpp_fin_pkt["seq"] +1 : logging.info("ACK PACKET FOR FIN(neigh->actor)") self.neigh_to_mpp_fin_ack = 1 def recaculate_mpp_to_neigh_pkt_seq_ack(self): flags = self.mpp_to_neigh_pkt.get("flags") if self.mpp_to_neigh_pkt.get("len") == 0: if flags&SYN or flags&FIN: seq = self.mpp_to_neigh_pkt.get("seq") + 1 else: seq = self.mpp_to_neigh_pkt.get("seq") + self.mpp_to_neigh_pkt.get("len") else: seq = self.mpp_to_neigh_pkt.get("seq") + self.mpp_to_neigh_pkt.get("len") flags = self.neigh_to_mpp_pkt.get("flags") #logging.info("neigh_to_mpp_pkt.seq=",self.neigh_to_mpp_pkt.get("seq")) #logging.info("len=",self.neigh_to_mpp_pkt.get("len")) #logging.info("flags=",flags) if self.neigh_to_mpp_pkt.get("len") == 0: if flags&SYN or flags&FIN: logging.debug("$$$$$$$$$$$$$$$$$$ flag&SYN=%s, flag&FIN=%s"%(flags&SYN,flags&FIN)) ack = self.neigh_to_mpp_pkt.get("seq") + 1 else: ack = self.neigh_to_mpp_pkt.get("seq") + self.neigh_to_mpp_pkt.get("len") else: ack = self.neigh_to_mpp_pkt.get("seq") + self.neigh_to_mpp_pkt.get("len") #logging.info("seq,ack=",seq,ack) return seq%TCP_MAX_SEQ,ack%TCP_MAX_SEQ def refill_mpp_to_neigh_pkt_timestamp(self,pkt): tcp_hdrlen = pkt[TCP].dataofs * 4 if tcp_hdrlen <= 20: return None,None tsdata = dict(pkt[TCP].options) if tsdata != None: if tsdata.get("Timestamp") != None: tsval = tsdata["Timestamp"][0] if self.neigh_to_mpp_pkt.get("tsval") != None: tsecr = self.neigh_to_mpp_pkt.get("tsval") if tsval <= self.neigh_to_mpp_pkt.get("tsval"): logging.info("$$$$$$$$$$$$$$$$$refill_mpp_to_neigh_pkt_timestamp,tsval is lower than last tsval") tsval = self.neigh_to_mpp_pkt.get("tsval") + 1 return tsecr%TCP_TIMESTAMP_MAX,tsval%TCP_TIMESTAMP_MAX else: return None,tsval%TCP_TIMESTAMP_MAX else: return None,None else: return None,None def recv_from_neigh(self,pkt): #logging.info("recv_from_neigh") if self.neigh_to_mpp_pkt.get("pkt") != None: flags = self.neigh_to_mpp_pkt.get("flags") if self.neigh_to_mpp_pkt.get("len") == 0: if flags&SYN or flags&FIN: if pkt[TCP].seq == self.neigh_to_mpp_pkt["seq"] + 1: logging.info("recv a packet from neigh") self.store_neigh_to_mpp_pkt(pkt) else: logging.info("recv a neigh packet with wrong seq") return None else: if pkt[TCP].seq == self.neigh_to_mpp_pkt["seq"]: logging.info("recv a packet from neigh") self.store_neigh_to_mpp_pkt(pkt) else: logging.info("recv a neigh packet with wrong seq") return None else: if pkt[TCP].seq == self.neigh_to_mpp_pkt["seq"]+self.neigh_to_mpp_pkt["len"]: logging.info("recv a packet from neigh") self.store_neigh_to_mpp_pkt(pkt) else: logging.info("recv a neigh packet with wrong seq") return None else: logging.info("recv the FIRST packet from neigh") self.store_neigh_to_mpp_pkt(pkt) self.neigh_syn_set() return pkt def send_to_neigh(self,pkt,interface_to_neigh): #logging.info("send_to_neigh") if pkt!=None: seq,ack = self.recaculate_mpp_to_neigh_pkt_seq_ack() try: tsecr, tsval= self.refill_mpp_to_neigh_pkt_timestamp(pkt) except: logging.exception("refill_mpp_to_neigh_pkt_timestamp tsval error") logging.exception(str(pkt.show())) logging.exception(hexdump(pkt)) vlanid = get_vlanid_to_neigh(pkt) # reassemble the tcp packet reassembled_pkt = reassemble_tcp_packet(pkt,seq,ack,self.actor[Actor_Worker].actor_port,self.actor[Actor_Worker].neigh_port,vlanid,tsecr,tsval) # store the reassembled pkt for next use self.store_mpp_to_neigh_pkt(reassembled_pkt) # send to neigh #logging.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!send_to_neigh with len=") #logging.info(reassembled_pkt[IP].len) sendp(reassembled_pkt,iface=interface_to_neigh,verbose=0) return def send_to_actor(self,pkt,interface_to_actor): #send pkt to four actor for actor in self.actor: #if actor.state != ACTOR_NOT_AVAILABLE: actor.send_to_actor(pkt,interface_to_actor) class MPP_TCP(object): def __init__(self,interface_to_neigh,interface_to_actor): self.interface_to_neigh = interface_to_neigh self.interface_to_actor = interface_to_actor self.connection_list = [] def search_conn(self,peer_neigh,peer_router): global mutex if mutex.acquire(): for i in range(len(self.connection_list)): if(self.connection_list[i].peer_neigh == peer_neigh and self.connection_list[i].peer_router == peer_router): mutex.release() return self.connection_list[i] mutex.release() return None def search_conn_1(self,peer_neigh,peer_router): global mutex if mutex.acquire(): for i in range(len(self.connection_list)): if self.connection_list[i].peer_neigh == peer_neigh: if self.connection_list[i].peer_router[0] == peer_router[0] and self.connection_list[i].peer_router[1] != peer_router[1]: mutex.release() return self.connection_list[i] else: if self.connection_list[i].peer_router == peer_router: if self.connection_list[i].peer_neigh[0] == peer_neigh[0] and self.connection_list[i].peer_neigh[1] != peer_neigh[1]: mutex.release() return self.connection_list[i] mutex.release() return None def search_conn_Actor(self,peer_neigh,peer_router,actorid): global mutex if mutex.acquire(): for i in range(len(self.connection_list)): if(self.connection_list[i].peer_neigh[0] == peer_neigh[0] and self.connection_list[i].peer_router[0] == peer_router[0] and self.connection_list[i].peer_neigh[1] == peer_neigh[1] ): if self.connection_list[i].actor[actorid].actor_port == peer_router[1]: mutex.release() return self.connection_list[i] mutex.release() return None def search_conn_Actor_1(self,peer_neigh,peer_router,actorid): global mutex if mutex.acquire(): for i in range(len(self.connection_list)): if(self.connection_list[i].peer_neigh[0] == peer_neigh[0] and self.connection_list[i].peer_router[0] == peer_router[0] and self.connection_list[i].peer_neigh[1] == peer_neigh[1] ): if self.connection_list[i].actor[actorid].actor_port != peer_router[1] and self.connection_list[i].actor[actorid].actor_port == None: mutex.release() return self.connection_list[i] mutex.release() return None def judge_same_connection_with_opposite_direction(self,peer_neigh,peer_router): global mutex if mutex.acquire(): for i in range(len(self.connection_list)): if(self.connection_list[i].peer_neigh[0] == peer_neigh[0] and self.connection_list[i].peer_router[0] == peer_router[0]): #the same ip address if(self.connection_list[i].peer_neigh[1] == peer_router[1] or self.connection_list[i].peer_router[1] == peer_neigh[1]):# the opposite tcp dstport mutex.release() return self.connection_list[i] mutex.release() return None def del_conn(self,conn): global mutex if mutex.acquire(): logging.debug("len of connection_list:%s"%(len(self.connection_list))) if len(self.connection_list) == 0: mutex.release() return remove_num = 0 logging.debug("connection_list:") logging.debug(self.connection_list) for i in range(len(self.connection_list)): if remove_num: i=i-remove_num try: if self.connection_list[i] == conn: self.connection_list.remove(conn) remove_num = remove_num + 1 except IndexError: logging.info("IndexError") return 0 logging.debug("after del , connection_list:") logging.debug(self.connection_list) mutex.release() return 1 def update_Actor_List(self,actorid): now = time.time() for i in range(len(Actor_List)): if Actor_List[i]["actorid"] == actorid: Actor_List[i]["available"] = now return def conn_garbage_collection(self): while True: logging.info("start to collect the conn garbage") now = time.time() if len(self.connection_list) != 0: remove_num = 0 for i in range(len(self.connection_list)): if remove_num: i=i-remove_num if now - self.connection_list[i].time > CONN_MAX_LIFE_TIME: remove_num += self.del_conn(self.connection_list[i]) time.sleep(60) def actor_garbage_collection(self): while True: logging.info("start to test actors' state ") for i in range(len(Actor_List)): now = time.time() if Actor_List[i]["available"] != ACTOR_NOT_AVAILABLE: if now - Actor_List[i]["available"] > ACTOR_MAX_LIFE_TIME: Actor_List[i]["available"] = ACTOR_NOT_AVAILABLE for j in range(len(self.connection_list)): for k in range(len(self.connection_list[j].actor)): if self.connection_list[j].actor[k].id == i: self.connection_list[j].actor[k].state = ACTOR_NOT_AVAILABLE for i in range(len(Actor_List)): logging.debug( Actor_List[i]) time.sleep(60) def rst_pkt_process(self,pkt,conn): logging.info("RST PACKET, DEL THE CONNECTION") #pkt.show() self.del_conn(conn) def fin_pkt_process(self,conn): if conn.neigh_to_mpp_fin == 1 and conn.neigh_to_mpp_fin_ack == 1 and conn.mpp_to_neigh_fin == 1 and conn.mpp_to_neigh_fin_ack == 1: logging.info("FIN FINISHED,DEL THE CONNECTION") self.del_conn(conn) def prepare_rst_packet(self,pkt): flags = pkt[TCP].flags if len(pkt[TCP].payload) == 0: if flags&SYN or flags&FIN: rst_pkt = Ether(src=pkt[Ether].dst, dst=pkt[Ether].src)/Dot1Q(vlan=pkt[Dot1Q].vlan)/IP(dst=pkt[IP].src,src=pkt[IP].dst)/TCP(dport=pkt[TCP].sport,sport=pkt[TCP].dport,ack=pkt[TCP].seq + 1,seq=pkt[TCP].ack,flags=RST | ACK) return rst_pkt rst_pkt = Ether(src=pkt[Ether].dst, dst=pkt[Ether].src)/Dot1Q(vlan=pkt[Dot1Q].vlan)/IP(dst=pkt[IP].src,src=pkt[IP].dst)/TCP(dport=pkt[TCP].sport,sport=pkt[TCP].dport,ack=pkt[TCP].seq + len(pkt[TCP].payload),seq=pkt[TCP].ack,flags=RST | ACK) return rst_pkt def deal_neigh_pkt(self,pkt): #logging.info("deal_neigh_pkt") print "i am in deal_neigh_pkt" #print pkt.show() pkt_str = str(pkt) new_pkt = pkt.__class__(pkt_str[16:]) #print new_pkt.show() pkt_hex = pkt_str[:16].encode('hex') ftag = int(pkt_hex[26]*16) + int (pkt_hex[27]) pkt = Ether(src=new_pkt[Ether].src,dst=new_pkt[Ether].dst)/Dot1Q(vlan=1000+ftag)/new_pkt.payload if pkt.haslayer(Ether): pkt = Ether(str(pkt)) if pkt[Ether].type != 0x8100: logging.info ("not vlan pkt from neighbor, discard it") return if check_pkt_len(pkt) == 0: # trunked packet logging.warning(str(pkt.show())) logging.warning(hexdump(pkt)) return if pkt.haslayer(TCP): dst = pkt[IP].dst src = pkt[IP].src dport = pkt[TCP].dport sport = pkt[TCP].sport conn = self.search_conn((src,sport),(dst,dport)) if conn == None: # conn not exist if pkt[TCP].flags&SYN and pkt[TCP].flags&ACK == 0: #only SYN packet can create connection logging.info("connection don't exist,recv a syn packet") logging.info("judge the same direction connection") old_conn = self.search_conn_1((src,sport),(dst,dport))#judge the same direction connection if old_conn != None: logging.info("find the old one and del it") self.del_conn(old_conn) logging.info("judge the opposite direction connection") old_conn = self.judge_same_connection_with_opposite_direction((src,sport),(dst,dport)) print "919 old_conn = %s"%(old_conn) if old_conn != None: logging.info("find the opposite direction connection") #judge the state of the connection if old_conn.direction == ROUTER_TO_NEIGH: #if old_conn.state != 1: logging.info("find the opposite directin connection with state = 0,send reset to actor") for i in range(len(old_conn.actor)): if old_conn.actor[i].actor_to_mpp_pkt["pkt"]!=None: rst_pkt = self.prepare_rst_packet(old_conn.actor[i].actor_to_mpp_pkt["pkt"]) sendp(rst_pkt,iface=self.interface_to_actor,verbose=0) logging.info("del the conn") self.del_conn(old_conn) ''' else: logging.info("already exist opposite direction connection with state = 1,send reset") rst_pkt = self.prepare_rst_packet(pkt) sendp(rst_pkt,iface=self.interface_to_neigh,verbose=0) return ''' logging.info("create a new connection") conn = Connection((src,sport),(dst,dport)) for i in range(len(Actor_List)): conn.create_actor(Actor_List[i]["actorid"]) conn.actor[Actor_List[i]["actorid"]].actor_port = dport conn.actor[Actor_List[i]["actorid"]].neigh_port = sport conn.direction = NEIGH_TO_ROUTER conn.state = 1 global mutex if mutex.acquire(): self.connection_list.append(conn) mutex.release() else:#other packets, send RST to terminate the connection logging.info("no conn,not syn,send reset") rst_pkt = self.prepare_rst_packet(pkt) sendp(rst_pkt,iface=self.interface_to_neigh,verbose=0) return conn.update_conn_time() if conn.recv_from_neigh(pkt)!=None: print "lqs@963 send_to_actor" #print pkt.show() conn.send_to_actor(pkt,self.interface_to_actor) else: return # process rst and fin packet if pkt[TCP].flags & RST: logging.info("neigh send reset") self.rst_pkt_process(pkt,conn) self.fin_pkt_process(conn) else: logging.info("Warning! Not a TCP packet!") return def deal_actor_pkt_deep(self,pkt,actorid): #logging.info("deal_actor_pkt") pkt = Ether(str(pkt)) if pkt.haslayer(TCP): dst = pkt[IP].dst src = pkt[IP].src dport = pkt[TCP].dport sport = pkt[TCP].sport logging.debug("recv pkt with dst=%s,src=%s,dport=%s,sport=%s"%(dst,src,dport,sport)) if pkt[TCP].flags & RST == 0: #logging.info("update_Actor_List") self.update_Actor_List(actorid) conn = self.search_conn((dst,dport),(src,sport)) if actorid == Actor_Worker and conn == None: logging.info("worker,conn=None") if pkt[TCP].flags&SYN and pkt[TCP].flags&ACK == 0 : #only SYN packet can create connection logging.info("connection don't exist,recv a syn packet") logging.info("judge the same direction connection") old_conn = self.search_conn_1((dst,dport),(src,sport)) if old_conn != None: logging.info("find the old one and del it") self.del_conn(old_conn) logging.info("judge the opposite direction connection") old_conn = self.judge_same_connection_with_opposite_direction((dst,dport),(src,sport)) if old_conn != None: logging.info("find the opposite direction connection") #judge the state of the connection if old_conn.direction == NEIGH_TO_ROUTER: logging.info("already exist opposite direction connection ,send reset") rst_pkt = self.prepare_rst_packet(pkt) sendp(rst_pkt,iface=self.interface_to_actor,verbose=0) return logging.info("create a new connection") conn = Connection((dst,dport),(src,sport)) for i in range(len(Actor_List)): conn.create_actor(Actor_List[i]["actorid"]) conn.actor[actorid].actor_port = sport conn.actor[actorid].neigh_port = dport conn.direction = ROUTER_TO_NEIGH global mutex if mutex.acquire(): self.connection_list.append(conn) mutex.release() #logging.info ("conn=",conn,"conn.direction ==",conn.direction," conn.peer_neigh[1]=", conn.peer_neigh[1],"dport=",dport,"conn.actor[actorid].actor_port=",conn.actor[actorid].actor_port,"sport = ",sport) else: logging.info("worker send packet,no conn,not syn,send reset") rst_pkt = self.prepare_rst_packet(pkt) sendp(rst_pkt,iface=self.interface_to_actor,verbose=0) return if actorid != Actor_Worker and conn == None: conn = self.search_conn_Actor((dst,dport),(src,sport),actorid) # search IP peer and actorid.actor_port if conn == None: conn = self.search_conn_Actor_1((dst,dport),(src,sport),actorid) #search IP peer and actorid.actor_port = None if conn == None: logging.info("inspectior,conn=none,don't create,send rst") logging.info("inspector send packet,no conn,send reset") rst_pkt = self.prepare_rst_packet(pkt) sendp(rst_pkt,iface=self.interface_to_actor,verbose=0) return else: logging.info("the same Ip peer and conn.direction == ROUTER_TO_NEIGH,the actor's neigh_port and actor_port==None,means a new connection created by worker,waiting for the inspector to send syn ") if pkt[TCP].flags&SYN and pkt[TCP].flags&ACK == 0: #syn packet conn.actor[actorid].actor_port = sport conn.actor[actorid].neigh_port = dport logging.debug ("conn=%s,conn.direction ==%s,conn.peer_neigh=%s,conn.peer_router=%s"%(conn,conn.direction,conn.peer_neigh,conn.peer_router)) #for i in range(len(conn.actor)): #logging.debug ("conn.actor[%s].actor_port=%s,conn.actor[%s].neigh_port=%s,state=%s"%(i,conn.actor[i].actor_port,i,conn.actor[i].neigh_port,conn.actor[i].state)) conn.update_conn_time() actor = conn.actor[actorid] if actor.recv_from_actor(pkt)!=None: actor.send_to_neigh(pkt,self.interface_to_neigh) else: return # process rst and fin packet if actorid == Actor_Worker and pkt[TCP].flags & RST: #if pkt[TCP].flags & RST: #logging.info("worker send reset") logging.info("received reset, del the conn") self.rst_pkt_process(pkt,conn) return self.fin_pkt_process(conn) else: logging.info("Warning! Not a TCP packet!") return def deal_actor_pkt(self,pkt): print "i am in deal_actor_pkt" if pkt.haslayer(Ether): #analyse vlanid to get the actorid pkt = Ether(str(pkt)) if pkt[Ether].type != 0x8100: logging.info( 'not vlan pkt from actor, discard it') return if check_pkt_len(pkt) == 0: # trunked packet logging.warning(str(pkt.show())) logging.warning(hexdump(pkt)) return vlanid_actor = pkt[Dot1Q].vlan logging.debug("actor pkt with vlan:%s"%vlanid_actor) actorid = search_actorid_by_vlanid(vlanid_actor) if actorid == None: logging.warning("wrong vlanid, not corresponding to an actor") return if actorid > ACTOR_NUMBERS - 1: logging.warning("actorid is bigger than ACTOR_NUMBERS,don't process it") return else: #logging.info("actor pkt, actorid:",actorid) self.deal_actor_pkt_deep(pkt,actorid) def sniff_neth(self): print "self.interface_to_neigh = %s"%(self.interface_to_neigh) sniff(prn=self.deal_neigh_pkt,store=0,iface=self.interface_to_neigh,filter='ether[13]>0x00 and ether[13]<0x15 and ether[28]=0x08 and ether[29]=0x00 and ether[39]=0x06') def sniff_seth(self): print "self.interface_to_actor = %s"%(self.interface_to_actor) sniff(prn=self.deal_actor_pkt,store=0,iface=self.interface_to_actor,filter='tcp and ip src 192.168.10.1') def run(self): neth_thread = threading.Thread(target = self.sniff_neth) seth_thread = threading.Thread(target = self.sniff_seth) garbage_collection_thread = threading.Thread(target = self.conn_garbage_collection) actor_garbage_collection_thread = threading.Thread(target = self.actor_garbage_collection) neth_thread.setDaemon(True) seth_thread.setDaemon(True) garbage_collection_thread.setDaemon(True) actor_garbage_collection_thread.setDaemon(True) neth_thread.start() seth_thread.start() garbage_collection_thread.start() actor_garbage_collection_thread.start() while True: time.sleep(deadlock) if neth_thread.isAlive(): pass else: logging.info("neth_thread is down") neth_thread = threading.Thread(target = self.sniff_neth) neth_thread.setDaemon(True) neth_thread.start() if seth_thread.isAlive(): pass else: logging.info("seth_thread is down") seth_thread = threading.Thread(target = self.sniff_seth) seth_thread.setDaemon(True) seth_thread.start() if garbage_collection_thread.isAlive(): pass else: logging.info("garbage_collection_thread is down") garbage_collection_thread = threading.Thread(target = self.conn_garbage_collection) garbage_collection_thread.setDaemon(True) garbage_collection_thread.start() if actor_garbage_collection_thread.isAlive(): pass else: logging.info("actor_garbage_collection_thread is down") actor_garbage_collection_thread = threading.Thread(target = self.actor_garbage_collection) actor_garbage_collection_thread.setDaemon(True) actor_garbage_collection_thread.start() return def main(): bRun = single_test() if bRun == 1: return init_from_config() print "********************" for i in range(len(Actor_List)): logging.info( Actor_List[i]) #print_if_vlan_mpp_list() mpp_tcp = MPP_TCP(MPP_If_List['mpp_south_if_name'],MPP_If_List['mpp_north_if_name']) mpp_tcp.run() #mpp_tcp_1 = MPP_TCP("seth2","neth2") #mpp_tcp_1.run() if __name__=="__main__": main()