项目要求 使用 Dijkstra 算法(dial 实现)计算任意两点间的最短路; 使用最短路由配置任意两点间的通信业务; 将所有业务在可视化平台上进行展示。 Dijkstra 算法原理 基本思想 Dijkstra 算法使用了广度优先搜索解决赋权有向图或者无向图的单源最短路径问题,采用的是一种贪心的策略,构造一个循环,每次循环都增加一个顶点到最短路径树上,所选择的顶点即为在所有与树邻接的顶点中和源点最近的顶点, 其中就隐含了广度优先搜索的思想,逐步地发展最短路径树,直到它覆盖所有顶点。其中,记录每一个顶点的前继节点可以实现最短路的重建,对距离的更新可以保证现有路径为当前最优解。
基本步骤 初始化:使用 d(i)来表示顶点 i 的距离标记,初始化为无穷大;使用 p(i)来表示顶点 i 在最短路径树上的前继点,初始化为 null;使用 S 表示最短路径树上的顶点集合,初始化时加入源点 s,同时将 s 点的距离标记初始化为 0, 前继点为 null 对 s 点做更新,更新操作为:对需要更新的点 i 的所有邻接点,如果 i 点到源点的距离和 i 点到邻接点的权重之和小于邻接点到源点的的距离,由贪心算法求局部最优解的要求,将该邻接点到源点的距离标记更新为 i 点到源点的距离和 i 点到邻接点的权重之和,此时邻接点的前继点为 i。 循环扩张最小路径树,直到所有顶点都在该树上,循环内的操作为: 找出已经永久标记的顶点集合和未被标记的顶点集合之间的权重最小割边,将该割边上未被探索的顶点加入已被标记的顶点集合,同时更新对该顶点做更新,更新操作如(2)。 以上方法为单源多宿最短路的求解,若要求解单源单宿最短路,可以增加一个判断模块,当宿点被永久标记时终止循环。永久标记条件:当一点的所 有入度的另一端都已经被永久标记时,该点即可被永久标记。 示例
上图是一个无向有权图,若源点为 a,最小路径树的生成过程为:
a->b:0+1< ∞ ; a −> c: 0 + 6 < ∞. 所以 d[b] = 1, d[c] = 6,前继点为 a,进入 while 循环,选择最小割边,将 b 加入最小路径树,更新 b; 对 b 的邻接点做更新,1+2 < d[d] ,1+5 < d[e] ,所以d[d] = d[b] + w[b,d] = 3,d[e] = d[b] +w[b,e] = 6 ,将 d,e 的前继点设为 b,进入下一次循环,选择最小割边, 将 d 加入最小路径树,更新 d; 一直进行如上的循环,直到所有顶点都在最短路径树上。 算法实现 引进新的数据结构——桶 特殊之处 根据距离值的最大值留出足够数量的桶,桶的标记即为距离值,根据顶点的距离标记将顶点加入不同的桶中,执行 FindMin 操作时直接从 标记最小的桶读起,节约了比较的时间。同时,距离标记更新后要将顶点转移到 新的桶里面,
桶的数量 令 C 表示边权的最大值,则距离的最大值为 nC,同时还有 一个距离为无穷大的桶,所以总共需要 nC+1 个桶。
缺陷 nC 数量级的桶太消耗空间了,有大量的桶一直闲置。
改进:循环桶 由于边界点的距离标记不会超过 d[i]+C(i 为当前标记点,C 为权重最大值), 对顶点 x 的标号为 d[x]mod(C+1),最多需要 C+1 个桶。
对于同一个桶里面的实际上距离标记不相同的点不必做额外处理,因为第一次永久标记的节点距离标记肯定是最短的,该永久标记点的邻接点不可能在同一个桶中(边权重值最大为 C,桶的编号为距离标记对 C+1 取余后的值),除非两个邻接点和该点之间的边权值相同,即使如此,选其中的任意一个也没有问题。 考虑是否标记邻接点时只需要判断邻接点是否已经被永久标记,是则不再标记, 否则永久标记。
每永久标记一次,桶的头部都要往后一个桶移动,并且保证改变桶的头部之 前先把头部桶里面的所有节点清除出桶,然后永久标记。
实现方式比较 数组(队列)实现 此方法为最通用的算法实现方法,也是对读者来说最容易理解的一种实现方法。核心思想为用一个数据结构来维护所有的边界点,相当于所有边界边被分成不同的小集合,与同一个边界点相关联的那些边只记录一个最小值。相比与直接简单粗暴地每次都搜索所有边再筛选后进行更新距离的 Dijkstra 的最直接实现算法,此算法的运行时间从 O(nm) 优化到了 O(n*n+m)。
堆实现 此方法逻辑上和 1 类似,只是用堆来维护 V-X 集合。堆本身可以利用 Python 中的 heapq 模块较为简单地实现各种操作,如存储时就给元素进行排序,在调用弹出元素时会自行选择最大或最小的元素进行弹出。距离值更新后要将元素先从堆中删除,再插入。这减少了算法中的代码编写和算法的复杂度。此方法下总 RT 为 O(mlogn)。
桶实现 此方法逻辑上和 1 类似,只是用桶来维护 V-X 集合。对普通桶,令 C 表示边权的最大值,则桶的数目上限为 nC+1。此时 RT 为 O(m+nC)。循环桶则是减小了桶的用量,但是 RT 不变。因为每个桶被检查的数目依旧会增大。从多次实验的结果来看,此方法在 RT 上实际上优于方法2。
最终,我们选择使用循环桶来实现 Dijkstra 算法。
循环桶实现 dijkstra 算法 步骤 初始化桶,把源节点加入到桶中。 弹出最小距离标记节点。 将最小距离标记节点永久标记。 将永久标记节点的所有邻接节点入桶。 重复 bcd 操作,直到所有桶空,算法结束。 示例 上图为一个有向有权图。若源点为 1 号点,则最短路径树的生成过程为
首先初始化循环桶,由于最大边权重为 15,所以 C 为 15。将点 1 放入桶内, 因为此时将点 1 的距离设为了 0,因此放入桶头(初始化时设置桶头为第一个桶),如图 2.5 所示。
进入循环,弹出最小距离标记节点。此时先从桶头开始搜索,按顺序寻找第 一个不为空的桶。在此范例中顺序为顺时针。找到该桶后开始弹出此桶内的元素。 在此范例中弹出的元素是 1 号; 之后开始检索,首先判断 1 号是否已经被永久标记,判断结果为 1 号未被永 久标记,则此时将 1 号永久标记。标记的同时永久记录此时的记录值,不再更改。 标记后搜索与 1 号相连的节点,将相应节点按照此时对应的距离对 16 取余后 的结果加入桶中。在本例中,加入 2 号(入 1 桶),再加入 3 号(入 12 桶)。如图 2.6 所示; 此时一轮循环已经结束,开始进行下一轮循环。桶头开始按顺时针方向移动, 直到找到下一个非空桶。在本例中它会发现 1 桶。开始清空此时的 1 桶。如图 2.7 所示; 2 号点被弹出,判断后进行永久标记并开始搜索和 2 号点相连的点。只要点未被永久标记,就按照 d 中规则将它们按照对应的距离值入相应的桶。需要注意的是,此时桶内会有两个 3 号点,因为它的距离值被更新过。但由于优先被弹出并且标记的 3 号点对应的距离一定更小,所以这种加入方式等效于让 3 号点移动到了能更快被弹出的桶内。见图 2.8; 重复上述过程,直到整个桶内无空桶,理论上整个图就完成了遍历,按照永久标记时记录的距离值可生成最小生成树。
代码实现 dijkstra 算法的实现 (1)初始化,将距离标记置为 99999,前继节点为空,源主机的距离标记赋为 0 。
1 2 3 4 5 6 7 8 9 def dijkstra (self,src_dpid,dst_dpid,src_port,dst_port ): 2. c = 29 adjacent = self.get_adjacent(self.links) buckets = BucketSet(c+1 ) node_s = (0 ,src_dpid) d = {} for u in self.switches: d[u] = 99999 d[src_dpid] = 0 pre = {}
(2)用循环桶来维护未标记点的集合,其中循环桶是自定义的一个类,需在前面中引用:
1 from BucketSet import BucketSet
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 buckets.add_thing(node_s) flag = 1 while not buckets.SetEmpty() and flag == 1 : min_list = buckets.pop_min() while not len (min_list) == 0 : min_node = min_list.pop() if (min_node[1 ] == dst_dpid): flag = 0 break for u in self.switches: if (u in adjacent[min_node[1 ]]): for link in self.links: if (link['src_dpid' ] == min_node[1 ]) and (link['dst_dpid' ]== u): delay = link['delay' ] break if (d[min_node[1 ]]+delay < d[u]): pre[u] = min_node[1 ] d[u] = d[min_node[1 ]] + delay buckets.add_thing((d[u],u))
(3)重建最短路径
1 2 3 4 5 6 7 8 9 getpath = [] s = dst_dpid while s != src_dpid: getpath.append(s) s = pre[s] getpath.append(src_dpid) getpath.reverse() self.minpath = getpath
(4)提供 ryu 配置最短路所需的端口信息,使流表的下发能够一步到位
1 2 3 4 5 6 7 8 9 10 11 12 ryu_path = [] in_port = src_port for s1,s2 in zip (getpath[:-1 ],getpath[1 :]): for link in self.links: if (link['src_dpid' ] == s1) and (link['dst_dpid' ] == s2): out_port = link['src_port_no' ] ryu_path.append((s1,in_port,out_port)) in_port = link['dst_port_no' ] if s2 == dst_dpid: ryu_path.append((s2,in_port,dst_port)) break return ryu_path
(5)求最短路径上的时延
1 2 3 4 5 6 7 def sum_delay (self,path,links ): delay = 0 for j in range (len (path)-1 ): for link in links: if (link['src_dpid' ] == path[j]) and (link['dst_dpid' ] == path[j+1 ]): delay += link['delay' ] return delay
(6)获取邻接点
1 2 3 4 5 6 7 8 9 def get_adjacent (self,links ): adjacent = {} for switch in self.switches: adjacent.setdefault(switch,[]) for switch in self.switches: for link in links: if link['src_dpid' ] == switch: adjacent[switch].append(link['dst_dpid' ]) return adjacent
桶类的实现 (1)首先创建循环桶类,并对一些基础变量进行初始化
1 2 3 4 class BucketSet (object ): header_loc = 0 things_number = 0
(2)定义用于初始化的函数
1 2 3 4 5 6 7 8 9 def __init__ (self,buc_num ): self.bucket_num = buc_num self.buckets = [None ]*buc_num self.init_buckets(buc_num) self.header = self.buckets[0 ] def init_buckets (self,bucket_num ): for i in range (bucket_num): self.buckets[i] = BucketSet.Bucket(i)
(3)定义用于使头桶指针后移的函数。因为每次对桶的元素取出都是针对头桶, 所以在头桶空后需要调用此函数实现对后续桶的搜索
1 2 3 def move_header (self ): self.header_loc = ((self.header_loc + 1 )% self.bucket_num) self.header = self.buckets[self.header_loc]
(4)定义用于计算入桶的节点应该去哪个桶的函数
1 2 def Length_cal (self,length ): return (length % self.bucket_num)
(5)定义用于处理节点加入桶的函数,作为和外部的关键接口
1 2 3 4 5 def add_thing (self,thing ): length = thing[0 ] id = self.Length_cal(length) self.buckets[id ].add_thing(thing) self.things_number += 1
(6)定义判断函数,用于判断整个循环桶是否全空,也就是确定整个 Dijkstra 算法何时结束
1 2 def SetEmpty (self ): return self.things_number == 0
(7)定义用于弹出桶中需要被永久标记的最短距离点的函数
1 2 3 4 5 6 def pop_min (self ): while self.header.is_empty(): self.move_header() min_lists = self.header.pop_out() self.things_number = self.things_number - len (min_lists) return
(8)在循环桶类内部定义桶类,即用于组成循环桶的小桶。依旧从初始化函数开始定义
1 2 3 4 5 6 class Bucket (object ): things_amount = 0 def __init__ (self,bucketId ): self.bucket = list () self.id = bucketId
(9)定义往小桶内加入节点的函数
1 2 3 def add_thing (self,thing ): self.bucket.append(thing) self.things_amount += 1
(10)定义弹出小桶内节点数据的函数
1 2 3 4 5 def pop_out (self ): things = self.bucket.copy() self.bucket.clear() self.things_amount = 0 return thing
(11)定义判断小桶是否为空的函数
1 2 def is_empty (self ): return self.things_amount == 0
RYU通信 (1)初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 class Kruskaltest (app_manager.RyuApp): OFP_VERSIONS=[ofproto_v1_3.OFP_VERSION] def __init__ (self,*args,**kwargs ): super (Kruskaltest,self).__init__(*args,**kwargs) self.mac_to_port={} self.topology_api_app=self self.sleep_interval = 0.5 self.switches = [] self.links = [] self.mst_links = [] self.pathes = {} self.banport = {}
(2)流表配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER ) def switch_features_handler (self,ev ): datapath=ev.msg.datapath ofproto=datapath.ofproto ofp_parser=datapath.ofproto_parser match =ofp_parser.OFPMatch() actions=[ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUF FER)] self.add_flow(datapath,0 ,match ,actions) def add_flow (self,datapath,priority,match ,actions ): ofproto=datapath.ofproto ofp_parser=datapath.ofproto_parser inst=[ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)] mod=ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,match =match ,instructions=inst) datapath.send_msg(mod)
(3)拓扑发现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @set_ev_cls(event.EventSwitchEnter,[CONFIG_DISPATCHER,MAIN_DISPATCHER] ) def switch_enter_handler (self,ev ): print ('==================================' ) print ('switch entered...' ) self.switches,self.links = self.get_topology() self.banport = self.find_banport(self.links[:]) for link in self.mst_links: print ("{} {}" .format (link['src_dpid' ],link['dst_dpid' ])) print ('==================================' ) time.sleep(self.sleep_interval) def get_topology (self ): switch_list = get_switch(self.topology_api_app,None ) switches = [switch.dp.id for switch in switch_list] print ('switches:{}' .format (switches)) for switch in switches: self.banport.setdefault(switch,[]) print ('------------------------------------' ) print ("banport first_list is{}" .format (self.banport)) link_list = get_link(self.topology_api_app,None ) links = [{'src_dpid' :link.src.dpid,'src_port_no' :link.src.port_no, 12. 'dst_dpid' :link.dst.dpid,'dst_port_no' :link.dst.port_no} for link in link_list] print ("the getted link_list is{}" .format (links)) file = open ("./topo7.json" ,"r" ) topoinfo = json.load(file) for link in links: for edge in topoinfo['links' ]: if (["s" +str (link["src_dpid" ]),"s" +str (link["dst_dpid" ])] == edge["vertexs" ] or ["s" +str (link["dst_dpid" ]),"s" +str (link["src_dpid" ])] == edge["vertexs" ]): link['delay' ] = edge['delay' ] break
(4)ryu 配置最短路
1 2 3 4 5 6 7 8 9 10 11 12 def configure_path (self,path,src,dst,datapath ): print ("configure the shortest path" ) path_print=src for switch,in_port,out_port in path: ofp_parser=datapath.ofproto_parser match =ofp_parser.OFPMatch(in_port=in_port,eth_src=src,eth_dst=dst) actions=[ofp_parser.OFPActionOutput(out_port)] self.add_flow(datapath,1 ,match ,actions) path_print+="-->{}-[{}]-{}" .format (in_port,switch,out_port) delay = self.sum_delay(self.minpath,self.links) path_print+="-->" +dst print ("the shortest path is:{}" .format (path_print),'delay:' ,delay)
(5)对 packetin 的修改
1 2 path=self.dijkstra(src_switch,dst_switch,first_port,final_port) self.configure_path(path,src,dst,datapath)
成果展示 拓扑的构建 (1)mininet 侧
(2)RYU 侧
(3)二十四节点网络拓扑可视化
Dijkstra 算法实现 (1)mininet 侧:连通性良好
(2)时延现象(以 h1 ping h19 为例)
(3)ryu 侧
(4)最短路可视化 (黄色路径为最短路,黑色路径为未经过路径)