最大流量和線性分配問題
已發表: 2022-03-11這裡有一個問題:您的企業指派承包商來履行合同。 您查看您的名冊並確定哪些承包商可用於一個月的參與,您查看您的可用合同以查看其中哪些是為期一個月的任務。 鑑於您知道每個承包商履行每份合同的效率如何,您如何分配承包商以最大限度地提高該月的整體效率?
這是分配問題的一個例子,這個問題可以用經典的匈牙利算法來解決。
匈牙利算法(也稱為 Kuhn-Munkres 算法)是一種多項式時間算法,它在加權二分圖中最大化權重匹配。 在這裡,承包商和合同可以建模為二分圖,它們的有效性是承包商和合同節點之間的邊的權重。
在本文中,您將了解使用 Edmonds-Karp 算法解決線性分配問題的匈牙利算法的實現。 您還將了解 Edmonds-Karp 算法如何是對 Ford-Fulkerson 方法的輕微修改,以及這種修改的重要性。
最大流量問題
最大流量問題本身可以非正式地描述為將一些流體或氣體通過管道網絡從單一來源移動到單一終端的問題。 這是在假設網絡中的壓力足以確保流體或氣體不會滯留在任何長度的管道或管道配件(不同長度管道相遇的地方)的情況下完成的。
通過對圖進行某些更改,可以將分配問題轉化為最大流問題。
預賽
解決這些問題所需的想法出現在許多數學和工程學科中,通常相似的概念有不同的名稱並以不同的方式表示(例如,鄰接矩陣和鄰接列表)。 由於這些想法非常深奧,因此要選擇如何為任何給定設置定義這些概念。
本文不會假設除了一些介紹性的集合論之外的任何先驗知識。
這篇文章中的實現將問題表示為有向圖(有向圖)。
有向圖
有向圖有兩個屬性, setOfNodes和setOfArcs 。 這兩個屬性都是集合(無序集合)。 在這篇文章的代碼塊中,我實際上使用的是 Python 的 freezeset,但這個細節並不是特別重要。
DiGraph = namedtuple('DiGraph', ['setOfNodes','setOfArcs'])(注意:本文以及本文中的所有其他代碼都是用 Python 3.6 編寫的。)
節點
一個節點n由兩個屬性組成:
n.uid:唯一標識符。這意味著對於任何兩個節點
x和y,
x.uid != y.uid-
n.datum:這代表任何數據對象。
Node = namedtuple('Node', ['uid','datum'])弧線
弧a由三個屬性組成:
a.fromNode:這是一個node ,如上定義。a.toNode:這是一個node ,如上定義。a.datum:這代表任何數據對象。
Arc = namedtuple('Arc', ['fromNode','toNode','datum']) 有向圖中的弧集表示有向圖中節點上的二元關係。 arc a的存在意味著a.fromNode和a.toNode之間存在關係。
在有向圖中(相對於無向圖), a.fromNode和a.toNode之間存在關係並不意味著a.toNode和a.fromNode之間存在類似的關係。
這是因為在無向圖中,所表達的關係不一定是對稱的。
有向圖
節點和弧可用於定義有向圖,但為方便起見,在下面的算法中,有向圖將使用字典來表示。
這是一種可以將上面的圖形表示轉換為類似於此的字典表示的方法:
def digraph_to_dict(G): G_as_dict = dict([]) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) pdg(G) raise KeyError(err_msg) if(a.toNode not in G.setOfNodes): err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) pdg(G) raise KeyError(err_msg) G_as_dict[a.fromNode] = (G_as_dict[a.fromNode].union(frozenset([a]))) if (a.fromNode in G_as_dict) else frozenset([a]) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if a.toNode not in G_as_dict: G_as_dict[a.toNode] = frozenset([]) return G_as_dict這是另一個將其轉換為字典的字典,另一個有用的操作:
def digraph_to_double_dict(G): G_as_dict = dict([]) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if(a.toNode not in G.setOfNodes): err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if(a.fromNode not in G_as_dict): G_as_dict[a.fromNode] = dict({a.toNode : frozenset([a])}) else: if(a.toNode not in G_as_dict[a.fromNode]): G_as_dict[a.fromNode][a.toNode] = frozenset([a]) else: G_as_dict[a.fromNode][a.toNode] = G_as_dict[a.fromNode][a.toNode].union(frozenset([a])) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if a.toNode not in G_as_dict: G_as_dict[a.toNode] = dict({}) return G_as_dict 當文章談到由字典表示的有向圖時,它會提到G_as_dict來引用它。
有時,通過它的uid (唯一標識符)從有向圖G中獲取節點會很有幫助:
def find_node_by_uid(find_uid, G): nodes = {n for n in G.setOfNodes if n.uid == find_uid} if(len(nodes) != 1): err_msg = 'Node with uid {find_uid!s} is not unique.'.format(**locals()) raise KeyError(err_msg) return nodes.pop()在定義圖時,人們有時使用術語節點和頂點來指代同一個概念。 術語arc和 edge 也是如此。
Python 中兩種流行的圖形表示是一種使用字典,另一種使用對象來表示圖形。 這篇文章中的表示介於這兩種常用表示之間。
這是我的有向圖表示。 有很多喜歡的,但這個是我的。
步行和路徑
令S_Arcs是有向圖G中弧的有限序列(有序集合),使得如果a是S_Arcs中除最後一個以外的任何弧,並且b在序列中跟隨a ,則必須有一個節點n = a.fromNode in G.setOfNodes使得a.toNode = b.fromNode 。
從S_Arcs中的第一個弧開始,一直持續到S_Arcs中的最後一個弧,收集(按遇到的順序)在S_Arcs中每兩個連續弧之間定義的所有節點n 。 標記在此操作期間收集的有序節點集合S_{Nodes} 。
def path_arcs_to_nodes(s_arcs): s_nodes = list([]) arc_it = iter(s_arcs) step_count = 0 last = None try: at_end = False last = a1 = next(arc_it) while (not at_end): s_nodes += [a1.fromNode] last = a2 = next(arc_it) step_count += 1 if(a1.toNode != a2.fromNode): err_msg = "Error at index {step_count!s} of Arc sequence.".format(**locals()) raise ValueError(err_msg) a1 = a2 except StopIteration as e: at_end = True if(last is not None): s_nodes += [last.toNode] return s_nodes如果任何節點在序列
S_Nodes中出現不止一次,則調用S_Arcsa Walk on digraphG。否則,在有向圖
G上調用S_Arcs從list(S_Nodes)[0]到list(S_Nodes)[-1]的路徑。
源節點
如果s在G.setOfNodes中並且G.setOfArcs不包含滿足a.toNode = s弧a ,則將節點s稱為有向圖G中的源節點。
def source_nodes(G): to_nodes = frozenset({a.toNode for a in G.setOfArcs}) sources = G.setOfNodes.difference(to_nodes) return sources終端節點
如果t在G.setOfNodes中並且G.setOfArcs不包含滿足a.fromNode = t的弧a ,則將節點t稱為有向圖G中的終端節點。
def terminal_nodes(G): from_nodes = frozenset({a.fromNode for a in G.setOfArcs}) terminals = G.setOfNodes.difference(from_nodes) return terminals削減和 st 削減
連通有向圖G的cut是G.setOfArcs的弧子集,它劃分G中的節點集G.setOfNodes 。 如果 G.setOfNodes 中的每個節點n並且在G.setOfNodes中至少有一個弧a使得n = a.fromNode G.setOfArcs n = a.toNode ,但a.fromNode != a.toNode ,則G是連接的。
Cut = namedtuple('Cut', ['setOfCutArcs']) 上面的定義指的是arcs的子集,但它也可以定義G.setOfNodes的節點的分區。
對於函數predecessors_of和successors_of , n是有向圖G的集合G.setOfNodes中的一個節點, cut是G的一個cut :
def cut_predecessors_of(n, cut, G): allowed_arcs = G.setOfArcs.difference(frozenset(cut.setOfCutArcs)) predecessors = frozenset({}) previous_count = len(predecessors) reach_fringe = frozenset({n}) keep_going = True while( keep_going ): reachable_from = frozenset({a.fromNode for a in allowed_arcs if (a.toNode in reach_fringe)}) reach_fringe = reachable_from predecessors = predecessors.union(reach_fringe) current_count = len(predecessors) keep_going = current_count - previous_count > 0 previous_count = current_count return predecessors def cut_successors_of(n, cut, G): allowed_arcs = G.setOfArcs.difference(frozenset(cut.setOfCutArcs)) successors = frozenset({}) previous_count = len(successors) reach_fringe = frozenset({n}) keep_going = True while( keep_going ): reachable_from = frozenset({a.toNode for a in allowed_arcs if (a.fromNode in reach_fringe)}) reach_fringe = reachable_from successors = successors.union(reach_fringe) current_count = len(successors) keep_going = current_count - previous_count > 0 previous_count = current_count return successors 令cut為有向圖G的一個剪切。
def get_first_part(cut, G): firstPartFringe = frozenset({a.fromNode for a in cut.setOfCutArcs}) firstPart = firstPartFringe for n in firstPart: preds = cut_predecessors_of(n,cut,G) firstPart = firstPart.union(preds) return firstPart def get_second_part(cut, G): secondPartFringe = frozenset({a.toNode for a in cut.setOfCutArcs}) secondPart = secondPartFringe for n in secondPart: succs= cut_successors_of(n,cut,G) secondPart = secondPart.union(succs) return secondPart cut是有向圖G的一個切如果: (get_first_part(cut, G).union(get_second_part(cut, G)) == G.setOfNodes) and (len(get_first_part(cut, G).intersect(get_second_part(cut, G))) == 0)如果(x in get_first_part(cut, G)) and (y in get_second_part(cut, G) ) and (x != y)則cut稱為xy 切割。 當xy 割cut中的節點x是源節點且xy割割中的節點y是終端節點時,則該割稱為st 割。
STCut = namedtuple('STCut', ['s','t','cut'])流網絡
您可以使用有向圖G來表示流網絡。
分配每個節點n ,其中n在G.setOfNodes中是一個n.datum ,它是一個FlowNodeDatum :
FlowNodeDatum = namedtuple('FlowNodeDatum', ['flowIn','flowOut']) 分配每個弧a ,其中a在G.setOfArcs中, a.datum是FlowArcDatum 。
FlowArcDatum = namedtuple('FlowArcDatum', ['capacity','flow']) flowNodeDatum.flowIn和flowNodeDatum.flowOut是正實數。
flowArcDatum.capacity和flowArcDatum.flow也是正實數。
對於G.setOfNodes中的每個節點節點n :
n.datum.flowIn = sum({a.datum.flow for a in G.Arcs if a.toNode == n}) n.datum.flowOut = sum({a.datum.flow for a in G.Arcs if a.fromNode == n}) Digraph G現在代表一個流網絡。
G的流是指G中所有弧a的a.flow 。
可行的流程
讓有向圖G代表一個流網絡。
如果滿足以下條件,則由G表示的流網絡具有可行流:
對於
G.setOfNodes中的每個節點n,源節點和終端節點除外:n.datum.flowIn = n.datum.flowOut。對於
G.setOfNodes中的每個弧a:a.datum.capacity <= a.datum.flow。
條件 1 稱為守恆約束。
條件 2 稱為容量約束。
切割能力
由有向圖G表示的流網絡的源節點s和終端節點t的st 割stCut的割容量為:
def cut_capacity(stCut, G): part_1 = get_first_part(stCut.cut,G) part_2 = get_second_part(stCut.cut,G) s_part = part_1 if stCut.s in part_1 else part_2 t_part = part_1 if stCut.t in part_1 else part_2 cut_capacity = sum({a.datum.capacity for a in stCut.cut.setOfCutArcs if ( (a.fromNode in s_part) and (a.toNode in t_part) )}) return cut_capacity最小產能削減
令stCut = stCut(s,t,cut)是由有向圖G表示的流網絡的st 割。
stCut是由G表示的流網絡的最小容量割,如果在該流網絡中沒有其他st 割stCutCompetitor使得:
cut_capacity(stCut, G) < cut_capacity(stCutCompetitor, G)剝離流動
我想指的是有向圖,它是獲取有向圖G並從 G.setOfNodes 中的所有節點以及G.setOfNodes中的所有弧中剝離所有流數據的G.setOfArcs 。
def strip_flows(G): new_nodes= frozenset( (Node(n.uid, FlowNodeDatum(0.0,0.0)) for n in G.setOfNodes) ) new_arcs = frozenset([]) for a in G.setOfArcs: new_fromNode = Node(a.fromNode.uid, FlowNodeDatum(0.0,0.0)) new_toNode = Node(a.toNode.uid, FlowNodeDatum(0.0,0.0)) new_arc = Arc(new_fromNode, new_toNode, FlowArcDatum(a.datum.capacity, 0.0)) new_arcs = new_arcs.union(frozenset([new_arc])) return DiGraph(new_nodes, new_arcs)最大流量問題
表示為有向圖G的流網絡,G.setOfNodes 中的源節點s和G.setOfNodes中的終端節點t ,如果G.setOfNodes , G可以表示最大流問題:
(len(list(source_nodes(G))) == 1) and (next(iter(source_nodes(G))) == s) and (len(list(terminal_nodes(G))) == 1) and (next(iter(terminal_nodes(G))) == t)標記此表示:
MaxFlowProblemState = namedtuple('MaxFlowProblemState', ['G','sourceNodeUid','terminalNodeUid','maxFlowProblemStateUid']) 其中sourceNodeUid = s.uid 、 terminalNodeUid = t.uid和maxFlowProblemStateUid是問題實例的標識符。
最大流量解決方案
讓maxFlowProblem代表一個最大流量問題。 maxFlowProblem的解決方案可以由表示為有向圖H的流網絡表示。
Digraph H是輸入python maxFlowProblem上的最大流量問題的可行解決方案,如果:
strip_flows(maxFlowProblem.G) == strip_flows(H)。H是一個流網絡,具有可行流。
如果除了1和2:
- 不可能有由有向圖
K表示的其他流網絡,這樣strip_flows(G) == strip_flows(K)和find_node_by_uid(t.uid,G).flowIn < find_node_by_uid(t.uid,K).flowIn。
那麼H也是maxFlowProblem的最優解。
換句話說,可行的最大流量解決方案可以用有向圖表示,其中:
與對應的最大流量問題的有向圖
G相同,但任何節點和弧的n.datum.flowIn、n.datum.flowOut和a.datum.flow可能不同。表示具有可行流的流網絡。
並且,如果另外,它可以代表最佳的最大流量解決方案:
- 最大流問題中終端節點對應的節點的
flowIn盡可能大(條件1和2仍然滿足時)。
如果有向圖H表示可行的最大流量解決方案: find_node_by_uid(s.uid,H).flowOut = find_node_by_uid(t.uid,H).flowIn這遵循最大流量,最小切割定理(下面討論)。 非正式地,由於假設H具有可行的流,這意味著流在穿過任何(其他)節點(守恆約束)時既不能“創建”(源節點s除外)也不能“銷毀”(終端節點t除外)。
由於最大流問題僅包含單個源節點s和單個終端節點t ,因此在s處“創建”的所有流必須在t處“銷毀”,否則流網絡沒有可行流(將違反守恆約束)。
設有向圖H表示可行的最大流量解; 上面的值稱為H的st 流值。
讓:
mfps=MaxFlowProblemState(H, maxFlowProblem.sourceNodeUid, maxFlowProblem.terminalNodeUid, maxFlowProblem.maxFlowProblemStateUid) 這意味著mfps是maxFlowProblem的後繼狀態,這僅意味著mfps與 maxFlowProblem 完全一樣,但maxFlowProblem中弧a.flow a.flow弧a a maxFlowProblem.setOfArcs不同mfps.setOfArcs 。
def get_mfps_flow(mfps): flow_from_s = find_node_by_uid(mfps.sourceNodeUid,mfps.G).datum.flowOut flow_to_t = find_node_by_uid(mfps.terminalNodeUid,mfps.G).datum.flowIn if( (flow_to_t - flow_from_s) > 0): raise Exception('Infeasible st flow') return flow_to_t 這是mfps及其關聯的maxFlowProb的可視化。 圖像中的每條弧a都有一個標籤,這些標籤是a.datum.flowFrom / a.datum.flowTo ,圖像中的每個節點n都有一個標籤,這些標籤是n.uid {n.datum.flowIn / a.datum.flowOut} 。
st 截流
讓mfps代表MaxFlowProblemState ,讓stCut代表mfps.G的一個mfps.G 。 stCut的切割流定義為:
def get_stcut_flow(stCut,mfps): s = find_node_by_uid(mfps.sourceNodeUid, mfps.G) t = find_node_by_uid(mfps.terminalNodeUid, mfps.G) part_1 = get_first_part(stCut.cut,mfps.G) part_2 = get_second_part(stCut.cut,mfps.G) s_part = part_1 if s in part_1 else part_2 t_part = part_1 if t in part_1 else part_2 s_t_sum = sum(a.datum.flow for a in mfps.G.setOfArcs if (a in stCut.cut.setOfCutArcs) and (a.fromNode in s_part) ) t_s_sum = sum(a.datum.flow for a in mfps.G.setOfArcs if (a in stCut.cut.setOfCutArcs) and (a.fromNode in t_part) ) cut_flow = s_t_sum - t_s_sum return cut_flowst cut flow是從包含源節點的分區到包含終端節點的分區的流量總和減去從包含終端節點的分區到包含源節點的分區的流量總和。
最大流量,最小切割
讓maxFlowProblem表示最大流量問題,並讓maxFlowProblem的解決方案由表示為有向圖H的流網絡表示。
令minStCut為maxFlowProblem.G表示的流網絡的最小容量切割。
因為在最大流問題中,流僅起源於單個源節點並終止於單個終端節點,並且由於容量約束和守恆約束,我們知道進入maxFlowProblem.terminalNodeUid的所有流必須穿過任何st cut ,特別是它必須跨越minStCut 。 這意味著:
get_flow_value(H, maxFlowProblem) <= cut_capacity(minStCut, maxFlowProblem.G)解決最大流量問題
解決最大流量問題maxFlowProblem的基本思想是從有向圖H表示的最大流量解開始。 這樣的起點可以是H = strip_flows(maxFlowProblem.G) 。 然後的任務是使用H並通過對H.setOfArcs中某些弧a的a.datum.flow值的一些貪婪修改來產生由有向圖K表示的另一個最大流量解決方案,這樣K仍然不能表示具有可行流的流網絡和get_flow_value(H, maxFlowProblem) < get_flow_value(K, maxFlowProblem) 。 只要這個過程繼續下去,最近遇到的最大流量解決方案( K ) 的質量 ( get_flow_value(K, maxFlowProblem) ) 就優於已找到的任何其他最大流量解決方案。 如果該過程達到了它知道不可能有其他改進的程度,則該過程可以終止並且它將返回最優最大流量解決方案。
上面的描述是一般性的,並跳過了許多證明,例如這樣的過程是否可能或可能需要多長時間,我將提供更多細節和算法。
最大流量,最小割定理
從 Ford 和 Fulkerson 所著的 Flows in Networks 一書中,最大流最小割定理(定理 5.1)的陳述是:
對於任何網絡,從
s到t的最大流量值等於分離s和t的所有切割的最小切割容量。
使用這篇文章中的定義,可以轉化為:
由表示為有向圖H的流網絡表示的maxFlowProblem的解決方案是最優的,如果:
get_flow_value(H, maxFlowProblem) == cut_capacity(minStCut, maxFlowProblem.G)我喜歡這個定理證明,維基百科有另一個證明。
最大流最小割定理用於證明Ford-Fulkerson方法的正確性和完備性。
我還將在擴充路徑後的部分中給出定理的證明。
Ford-Fulkerson 方法和 Edmonds-Karp 算法
CLRS 定義了 Ford-Fulkerson 方法,如下所示(第 26.2 節):
FORD-FULKERSON-METHOD(G, s, t): initialize flow f to 0 while there exists an augmenting path p in the residual graph G_f augment flow f along殘差圖
表示為有向圖G的流網絡的殘差圖可以表示為有向圖G_f :
ResidualDatum = namedtuple('ResidualDatum', ['residualCapacity','action']) def agg_n_to_u_cap(n,u,G_as_dict): arcs_out = G_as_dict[n] return sum([a.datum.capacity for a in arcs_out if( (a.fromNode == n) and (a.toNode == u) ) ]) def agg_n_to_u_flow(n,u,G_as_dict): arcs_out = G_as_dict[n] return sum([a.datum.flow for a in arcs_out if( (a.fromNode == n) and (a.toNode == u) ) ]) def get_residual_graph_of(G): G_as_dict = digraph_to_dict(G) residual_nodes = G.setOfNodes residual_arcs = frozenset([]) for n in G_as_dict: arcs_from = G_as_dict[n] nodes_to = frozenset([find_node_by_uid(a.toNode.uid,G) for a in arcs_from]) for u in nodes_to: n_to_u_cap_sum = agg_n_to_u_cap(n,u,G_as_dict) n_to_u_flow_sum = agg_n_to_u_flow(n,u,G_as_dict) if(n_to_u_cap_sum > n_to_u_flow_sum): flow = round(n_to_u_cap_sum - n_to_u_flow_sum, TOL) residual_arcs = residual_arcs.union( frozenset([Arc(n,u,datum=ResidualDatum(flow, 'push'))]) ) if(n_to_u_flow_sum > 0.0): flow = round(n_to_u_flow_sum, TOL) residual_arcs = residual_arcs.union( frozenset([Arc(u,n,datum=ResidualDatum(flow, 'pull'))]) ) return DiGraph(residual_nodes, residual_arcs)agg_n_to_u_cap(n,u,G_as_dict)返回 G.setOfArcs 子集中所有弧的a.datum.capacity的總和,如果G.setOfArcsa.fromNode = n和a.toNode = u,則弧a在子集中。agg_n_to_u_cap(n,u,G_as_dict)返回 G.setOfArcs 子集中所有弧的a.datum.flow的總和,如果G.setOfArcsa.fromNode = n和a.toNode = u,弧a在子集中。
簡而言之,殘差圖G_f表示可以在有向圖G上執行的某些動作。
有向圖G表示的流網絡G.setOfNodes中的每一對節點n,u可以在G的殘差圖G_f中生成 0、1 或 2 條弧。
如果在
G.setOfArcs中沒有弧a使得a.fromNode = n和a.toNode = u,則對n,u不會在G_f中生成任何弧。對
n,u在G_f.setOfArcs中生成弧a,其中a表示標記為從n到u的推流弧aa = Arc(n,u,datum=ResidualNode(n_to_u_cap_sum - n_to_u_flow_sum))如果n_to_u_cap_sum > n_to_u_flow_sum。對
n,u在G_f.setOfArcs中生成弧a,其中a表示標記為從n到u的拉流弧的弧a = Arc(n,u,datum=ResidualNode(n_to_u_cap_sum - n_to_u_flow_sum))如果n_to_u_flow_sum > 0.0。
G_f.setOfArcs中的每個推流弧表示將總共x <= n_to_u_cap_sum - n_to_u_flow_sum流添加到 G.setOfArcs 子集中的弧的G.setOfArcs,其中弧a在子集中,如果a.fromNode = n和a.toNode = u。G_f.setOfArcs中的每個拉流弧表示將總計x <= n_to_u_flow_sum流減去 G.setOfArcs 子集中的弧的G.setOfArcs,其中如果a.fromNode = n和a.toNode = u,弧a在子集中。
在G中的適用弧上從G_f執行單獨的推或拉動作可能會生成一個沒有可行流的流網絡,因為在生成的流網絡中可能違反容量約束或守恆約束。
這是上一個最大流量解決方案可視化示例的殘差圖的可視化,在可視化中,每條弧a代表a.residualCapacity 。
增強路徑
令maxFlowProblem為最大流量問題,令G_f = get_residual_graph_of(G)為maxFlowProblem.G的殘差圖。
maxFlowProblem的增廣路徑augmentingPath是從find_node_by_uid(maxFlowProblem.sourceNode,G_f)到find_node_by_uid(maxFlowProblem.terminalNode,G_f)的任何路徑。
事實證明,增廣路徑augmentingPath可以應用於由有向圖H表示的最大流量解決方案,生成由有向圖K表示的另一個最大流量解決方案,其中get_flow_value(H, maxFlowProblem) < get_flow_value(K, maxFlowProblem)如果H不是最優的。
這是如何做:
def augment(augmentingPath, H): augmentingPath = list(augmentingPath) H_as_dict = digraph_to_dict(H) new_nodes = frozenset({}) new_arcs = frozenset({}) visited_nodes = frozenset({}) visited_arcs = frozenset({}) bottleneck_residualCapacity = min( augmentingPath, key=lambda a: a.datum.residualCapacity ).datum.residualCapacity for x in augmentingPath: from_node_uid = x.fromNode.uid if x.datum.action == 'push' else x.toNode.uid to_node_uid = x.toNode.uid if x.datum.action == 'push' else x.fromNode.uid from_node = find_node_by_uid( from_node_uid, H ) to_node = find_node_by_uid( to_node_uid, H ) load = bottleneck_residualCapacity if x.datum.action == 'push' else -bottleneck_residualCapacity for a in H_as_dict[from_node]: if(a.toNode == to_node): test_sum = a.datum.flow + load new_flow = a.datum.flow new_from_node_flow_out = a.fromNode.datum.flowOut new_to_node_flow_in = a.toNode.datum.flowIn new_from_look = {n for n in new_nodes if (n.uid == a.fromNode.uid)} new_to_look = {n for n in new_nodes if (n.uid == a.toNode.uid) } prev_from_node = new_from_look.pop() if (len(new_from_look)>0) else a.fromNode prev_to_node = new_to_look.pop() if (len(new_to_look)>0) else a.toNode new_nodes = new_nodes.difference(frozenset({prev_from_node})) new_nodes = new_nodes.difference(frozenset({prev_to_node})) if(test_sum > a.datum.capacity): new_flow = a.datum.capacity new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow + a.datum.capacity new_to_node_flow_in = prev_to_node.datum.flowIn - a.datum.flow + a.datum.capacity load = test_sum - a.datum.capacity elif(test_sum < 0.0): new_flow = 0.0 new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow new_to_node_flow_in = prev_to_node.datum.flowIn - a.datum.flow load = test_sum else: new_flow = test_sum new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow + new_flow new_to_node_flow_in = prev_to_node.datum.flowIn - a.datum.flow + new_flow load = 0.0 new_from_node_flow_out = round(new_from_node_flow_out, TOL) new_to_node_flow_in = round(new_to_node_flow_in, TOL) new_flow = round(new_flow, TOL) new_from_node = Node(prev_from_node.uid, FlowNodeDatum(prev_from_node.datum.flowIn, new_from_node_flow_out)) new_to_node = Node(prev_to_node.uid, FlowNodeDatum(new_to_node_flow_in, prev_to_node.datum.flowOut)) new_arc = Arc(new_from_node, new_to_node, FlowArcDatum(a.datum.capacity, new_flow)) visited_nodes = visited_nodes.union(frozenset({a.fromNode,a.toNode})) visited_arcs = visited_arcs.union(frozenset({a})) new_nodes = new_nodes.union(frozenset({new_from_node, new_to_node})) new_arcs = new_arcs.union(frozenset({new_arc})) not_visited_nodes = H.setOfNodes.difference(visited_nodes) not_visited_arcs = H.setOfArcs.difference(visited_arcs) full_new_nodes = new_nodes.union(not_visited_nodes) full_new_arcs = new_arcs.union(not_visited_arcs) G = DiGraph(full_new_nodes, full_new_arcs) full_new_arcs_update = frozenset([]) for a in full_new_arcs: new_from_node = a.fromNode new_to_node = a.toNode new_from_node = find_node_by_uid( a.fromNode.uid, G ) new_to_node = find_node_by_uid( a.toNode.uid, G ) full_new_arcs_update = full_new_arcs_update.union( {Arc(new_from_node, new_to_node, FlowArcDatum(a.datum.capacity, a.datum.flow))} ) G = DiGraph(full_new_nodes, full_new_arcs_update) return G 在上面, TOL是對網絡中的流量值進行四捨五入的一些容差值。 這是為了避免浮點計算的級聯不精確。 因此,例如,我使用TOL = 10來表示四捨五入到 10 位有效數字。
令K = augment(augmentingPath, H) ,則K表示maxFlowProblem的可行最大流量解決方案。 要使陳述成立,由K表示的流網絡必須具有可行的流(不違反容量約束或守恆約束。
原因如下:在上述方法中,添加到由有向圖K表示的新流網絡的每個節點要么是有向圖H中節點的精確副本,要么是在其n.datum.flowIn中添加了相同編號的節點n它的n.datum.flowOut 。 這意味著只要在H中滿足守恆約束,就在K中滿足守恆約束。 守恆約束得到滿足,因為我們明確檢查網絡中的任何新弧a是否具有a.datum.flow <= a.datum.capacity ; 因此,只要未修改地複製到K.setOfArcs中的集合H.setOfArcs中的弧不違反容量約束,則K不違反容量約束。
如果H不是最優的, get_flow_value(H, maxFlowProblem) < get_flow_value(K, maxFlowProblem)也是如此。
原因如下:對於最大流問題maxFlowProblem的殘差圖G_f的有向圖表示中存在的增廣路徑augmentingPath ,那麼增廣augmentingPath上的最後一條弧a必須是“推”弧,並且它必須具有a.toNode == maxFlowProblem.terminalNode 。 增廣路徑定義為終止於最大流量問題的終端節點的增廣路徑。 根據殘差圖的定義,很明顯,殘差圖上的增廣路徑中的最後一條弧必須是“推”弧,因為增廣路徑中的任何“拉”弧b都將具有b.toNode == maxFlowProblem.terminalNode和b.fromNode != maxFlowProblem.terminalNode來自path的定義。 另外,從path的定義來看,很明顯終端節點只被augment方法修改過一次。 因此, augment只修改了一次maxFlowProblem.terminalNode.flowIn並增加了maxFlowProblem.terminalNode.flowIn的值,因為augmentingPath中的最後一條弧必須是導致在augment期間修改maxFlowProblem.terminalNode.flowIn的弧。 從適用於“推”弧的augment的定義來看, maxFlowProblem.terminalNode.flowIn只能增加,不能減少。
來自 Sedgewick 和 Wayne 的一些證明
Robert Sedgewich 和 Kevin Wayne 所著的 Algorithms,第四版這本書有一些非常有用的簡短證明(第 892-894 頁)。 我將在這裡重新創建它們,儘管我將使用適合以前定義的語言。 我的校樣標籤與 Sedgewick 書中的標籤相同。
命題 E:對於表示最大流量問題maxFlowProblem的可行最大流量解的任何有向圖H ,對於任何stCut get_stcut_flow(stCut,H,maxFlowProblem) = get_flow_value(H, maxFlowProblem) 。
證明:令stCut=stCut(maxFlowProblem.sourceNode,maxFlowProblem.terminalNode,set([a for a in H.setOfArcs if a.toNode == maxFlowProblem.terminalNode])) 。 命題 E對stCut直接從st flow value的定義成立。 假設我們希望將某個節點n從 s-partition ( get_first_part(stCut.cut, G) ) 移動到 t-partition (get_second_part(stCut.cut, G))中,為此我們需要更改stCut.cut ,這可能會改變stcut_flow = get_stcut_flow(stCut,H,maxFlowProblem)並使命題 E無效。 但是,讓我們看看stcut_flow的值在我們進行更改時將如何變化。 節點n處於平衡狀態,這意味著流入節點n的流量總和等於流出節點 n 的流量總和(這是H表示可行解所必需的)。 請注意,作為stcut_flow的一部分進入節點n的所有流都從 s 分區進入(從 t 分區直接或間接進入節點n的流不會被計入stcut_flow值,因為它的方向是錯誤的根據定義)。 此外,所有離開n的流最終將(直接或間接)流入終端節點(如前所述)。 當我們將節點n移動到 t-partition 中時,所有從 s-partition 進入n的流必須添加到新的stcut_flow值; 但是,必須從新的stcut_flow值中減去所有退出n的流; 直接進入 t-partition 的流部分被減去,因為該流現在位於新 t-partition 的內部並且不計為stcut_flow 。 從n流向 s-partition 中的節點的部分流也必須從stcut_flow中減去:在n移動到 t-partition 之後,這些流將從 t-partition 引導到 s-partition 等等不能在stcut_flow中考慮,因為這些流被刪除,流入 s-partition 並且必須減去這些流的總和,以及從 s-partition 到 t-partition 的流出(其中所有流量來自st 必須結束)必須減少相同的數量。 由於節點n在該過程之前處於平衡狀態,因此更新將在新的stcut_flow值中添加相同的值,因為它減去了因此在更新之後使命題 E為真。 然後,命題 E的有效性來自對 t 分區大小的歸納。
以下是一些示例流網絡,以幫助可視化命題 E成立的不太明顯的情況; 圖中,紅色區域表示 s-partition,藍色區域表示 t-partition,綠色弧線表示st cut 。 在第二幅圖中,節點A 和節點B 之間的流量增加,而流入終端節點t 的流量沒有變化。:

推論:任何st cut 的流量值都不能超過任何st cut的容量。
命題 F.(最大流,最小割定理):設f為st 流。 以下3個條件是等價的:
存在一個容量等於流
f的值的st 割。f是最大流量。沒有關於
f的增廣路徑。
條件 1 由推論暗示條件 2。 條件 2 意味著條件 3,因為增廣路徑的存在意味著存在具有較大值的流,這與f的最大值相矛盾。 條件 3 意味著條件 1:讓C_s是可以從s到達的所有節點的集合,在殘差圖中具有增廣路徑。 讓C_t是剩餘的arcs ,那麼t必須在C_t中(根據我們的假設)。 從C_s到C_t的弧然後形成一個st 切割,其中僅包含弧a其中a.datum.capacity = a.datum.flow或a.datum.flow = 0.0 。 如果不是這樣,那麼由具有剩餘剩餘容量的弧連接到C_s的節點將在集合C_s中,因為從s到這樣的節點會有一條增廣路徑。 通過st 割的流量等於st 割的容量(因為從C_s到C_t的弧的流量等於容量),也等於st 流量的值(根據命題 E )。
最大流最小割定理的這個陳述暗示了網絡中的流的早期陳述。
推論(整數屬性):當容量為整數時,存在一個整數值的最大流,Ford-Fulkerson 算法找到它。
證明:每條增廣路徑將流量增加一個正整數,即“推”弧和“拉”弧中未使用容量的最小值,所有這些都是正整數。
這證明了來自CLRS的Ford-Fulkerson 方法描述。 方法是不斷尋找增廣路徑,並maxFlowSolution augment更好的解決方案,直到沒有更多的增廣路徑意味著最新的最大流量解決方案是最優的。
從福特-富爾克森到埃德蒙茲-卡普
關於解決最大流量問題的其餘問題是:
應該如何構建增廣路徑?
如果我們使用實數而不是整數,該方法會終止嗎?
終止(如果終止)需要多長時間?
Edmonds-Karp 算法指定每個增廣路徑由殘差圖的廣度優先搜索(BFS)構造; 事實證明,上述第 1 點的決定也將迫使算法終止(第 2 點)並允許確定漸近時間和空間複雜度。
首先,這是一個BFS實現:
def bfs(sourceNode, terminalNode, G_f): G_f_as_dict = digraph_to_dict(G_f) parent_arcs = dict([]) visited = frozenset([]) deq = deque([sourceNode]) while len(deq) > 0: curr = deq.popleft() if curr == terminalNode: break for a in G_f_as_dict.get(curr): if (a.toNode not in visited): visited = visited.union(frozenset([a.toNode])) parent_arcs[a.toNode] = a deq.extend([a.toNode]) path = list([]) curr = terminalNode while(curr != sourceNode): if (curr not in parent_arcs): err_msg = 'No augmenting path from {} to {}.'.format(sourceNode.uid, terminalNode.uid) raise StopIteration(err_msg) path.extend([parent_arcs[curr]]) curr = parent_arcs[curr].fromNode path.reverse() test = deque([path[0].fromNode]) for a in path: if(test[-1] != a.fromNode): err_msg = 'Bad path: {}'.format(path) raise ValueError(err_msg) test.extend([a.toNode]) return path我使用了來自 python 集合模塊的雙端隊列。
為了回答上面的問題 2,我將解釋 Sedgewick 和 Wayne 的另一個證明:命題 G。具有N個節點和A弧的Edmonds-Karp算法中所需的增廣路徑數最多為NA/2 。 證明:每條增廣路徑都有一條瓶頸弧——一條從殘差圖中刪除的弧,因為它對應於一條“推”弧,該弧對應填充到容量,或者一個“拉”弧,通過該弧,流量變為 0。每次arc成為瓶頸arc ,通過它的任何增廣路徑的長度必須增加 2 倍。這是因為路徑中的每個節點可能只出現一次或根本不出現(根據path的定義),因為路徑正在從最短路徑到最長路徑進行探索,這意味著通過特定瓶頸節點的下一條路徑必須訪問至少一個節點,這意味著在我們到達節點之前路徑上還有另外 2 個弧。 由於增廣路徑的長度最多為N每條弧最多可以在N/2增廣路徑上,並且增廣路徑的總數最多為NA/2 。
Edmonds-Karp 算法在O(NA^2)中執行。 如果在算法期間最多探索NA/2條路徑,並且使用BFS探索每條路徑是N+A則乘積的最重要項,因此漸近複雜度為O(NA^2) 。
讓mfp成為maxFlowProblemState 。
def edmonds_karp(mfp): sid, tid = mfp.sourceNodeUid, mfp.terminalNodeUid no_more_paths = False loop_count = 0 while(not no_more_paths): residual_digraph = get_residual_graph_of(mfp.G) try: asource = find_node_by_uid(mfp.sourceNodeUid, residual_digraph) aterm = find_node_by_uid(mfp.terminalNodeUid, residual_digraph) apath = bfs(asource, aterm, residual_digraph) paint_mfp_path(mfp, loop_count, apath) G = augment(apath, mfp.G) s = find_node_by_uid(sid, G) t = find_node_by_uid(tid, G) mfp = MaxFlowProblemState(G, s.uid, t.uid, mfp.maxFlowProblemStateUid) loop_count += 1 except StopIteration as e: no_more_paths = True return mfp 上面的版本效率低下,並且比O(NA^2)複雜度更差,因為它每次都構造一個新的最大流量解決方案和一個新的殘差圖(而不是隨著算法的進步而修改現有的有向圖)。 為了獲得真正的O(NA^2)解決方案,算法必須同時維護表示最大流問題狀態的有向圖及其相關的殘差圖。 因此算法必須避免不必要地迭代弧和節點,並僅在必要時更新它們的值和殘差圖中的關聯值。
為了編寫更快的Edmonds Karp算法,我重寫了上面的幾段代碼。 我希望通過生成新有向圖的代碼有助於理解正在發生的事情。 在快速算法中,我使用了一些我不想詳細介紹的新技巧和 Python 數據結構。 我會說a.fromNode和a.toNode現在被視為字符串和節點的 uid。 對於此代碼,讓mfps為maxFlowProblemState
import uuid def fast_get_mfps_flow(mfps): flow_from_s = {n for n in mfps.G.setOfNodes if n.uid == mfps.sourceNodeUid}.pop().datum.flowOut flow_to_t = {n for n in mfps.G.setOfNodes if n.uid == mfps.terminalNodeUid}.pop().datum.flowIn if( (flow_to_t - flow_from_s) > 0.00): raise Exception('Infeasible st flow') return flow_to_t def fast_e_k_preprocess(G): G = strip_flows(G) get = dict({}) get['nodes'] = dict({}) get['node_to_node_capacity'] = dict({}) get['node_to_node_flow'] = dict({}) get['arcs'] = dict({}) get['residual_arcs'] = dict({}) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if(a.toNode not in G.setOfNodes): err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) get['nodes'][a.fromNode.uid] = a.fromNode get['nodes'][a.toNode.uid] = a.toNode lark = Arc(a.fromNode.uid, a.toNode.uid, FlowArcDatumWithUid(a.datum.capacity, a.datum.flow, uuid.uuid4())) if(a.fromNode.uid not in get['arcs']): get['arcs'][a.fromNode.uid] = dict({a.toNode.uid : dict({lark.datum.uid : lark})}) else: if(a.toNode.uid not in get['arcs'][a.fromNode.uid]): get['arcs'][a.fromNode.uid][a.toNode.uid] = dict({lark.datum.uid : lark}) else: get['arcs'][a.fromNode.uid][a.toNode.uid][lark.datum.uid] = lark for a in G.setOfArcs: if a.toNode.uid not in get['arcs']: get['arcs'][a.toNode.uid] = dict({}) for n in get['nodes']: get['residual_arcs'][n] = dict() get['node_to_node_capacity'][n] = dict() get['node_to_node_flow'][n] = dict() for u in get['nodes']: n_to_u_cap_sum = sum(a.datum.capacity for a in G.setOfArcs if (a.fromNode.uid == n) and (a.toNode.uid == u) ) n_to_u_flow_sum = sum(a.datum.flow for a in G.setOfArcs if (a.fromNode.uid == n) and (a.toNode.uid == u) ) if(n_to_u_cap_sum > n_to_u_flow_sum): flow = round(n_to_u_cap_sum - n_to_u_flow_sum, TOL) get['residual_arcs'][n][u] = Arc(n,u,ResidualDatum(flow, 'push')) if(n_to_u_flow_sum > 0.0): flow = round(n_to_u_flow_sum, TOL) get['residual_arcs'][u][n] = Arc(u,n,ResidualDatum(flow, 'pull')) get['node_to_node_capacity'][n][u] = n_to_u_cap_sum get['node_to_node_flow'][n][u] = n_to_u_flow_sum return get def fast_bfs(sid, tid, get): parent_of = dict([]) visited = frozenset([]) deq = coll.deque([sid]) while len(deq) > 0: n = deq.popleft() if n == tid: break for u in get['residual_arcs'][n]: if (u not in visited): visited = visited.union(frozenset({u})) parent_of[u] = get['residual_arcs'][n][u] deq.extend([u]) path = list([]) curr = tid while(curr != sid): if (curr not in parent_of): err_msg = 'No augmenting path from {} to {}.'.format(sid, curr) raise StopIteration(err_msg) path.extend([parent_of[curr]]) curr = parent_of[curr].fromNode path.reverse() return path def fast_edmonds_karp(mfps): sid, tid = mfps.sourceNodeUid, mfps.terminalNodeUid get = fast_e_k_preprocess(mfps.G) no_more_paths, loop_count = False, 0 while(not no_more_paths): try: apath = fast_bfs(sid, tid, get) get = fast_augment(apath, get) loop_count += 1 except StopIteration as e: no_more_paths = True nodes = frozenset(get['nodes'].values()) arcs = frozenset({}) for from_node in get['arcs']: for to_node in get['arcs'][from_node]: for arc in get['arcs'][from_node][to_node]: arcs |= frozenset({get['arcs'][from_node][to_node][arc]}) G = DiGraph(nodes, arcs) mfps = MaxFlowProblemState(G, sourceNodeUid=sid, terminalNodeUid=tid, maxFlowProblemStateUid=mfps.maxFlowProblemStateUid) return mfps 這是該算法如何從上面解決示例流網絡的可視化。 可視化顯示步驟,因為它們反映在代表最新流網絡的有向圖G中,並且它們反映在該流網絡的殘差圖中。 殘差圖中的增廣路徑顯示為紅色路徑,表示受給定增廣路徑影響的節點和弧集的問題的有向圖以綠色突出顯示。 在每種情況下,我都會突出顯示圖表中將要更改的部分(紅色或綠色),然後在更改後顯示圖表(僅以黑色顯示)。
這是該算法如何解決不同示例流網絡的另一個可視化。 請注意,這個使用實數並包含多個具有相同fromNode和toNode值的弧。
**另請注意,由於帶有“拉”殘差數據的弧可能是增廣路徑的一部分,因此在流動網絡的有向圖中受影響的節點_可能不在G! .
二分圖
假設我們有一個有向圖G ,如果可以將G.setOfNodes中的節點劃分為兩個集合( part_1和part_2 ),則G是二分的,這樣對於part_1中的任何弧a , G.setOfArcs中的a.fromNode和a.toNode在part_1中。 a.fromNode中的part_2和a.toNode中的part_2也不是真的。
換句話說,如果 G 可以劃分為兩組節點,則G是二分的,使得每條弧都必須將一組中的節點連接到另一組中的節點。
測試二分法
假設我們有一個有向圖G ,我們想測試它是否是二分的。 我們可以在O(|G.setOfNodes|+|G.setOfArcs|)中通過貪婪地將圖形著色為兩種顏色來做到這一點。
首先,我們需要生成一個新的有向圖H 。 該圖將具有與G相同的節點集,但它將具有比G更多的弧。 G中的每個弧a將在H中創建 2 個弧; 第一條弧與a相同,第二條弧反轉a的導向器( b = Arc(a.toNode,a.fromNode,a.datum) )。
Bipartition = coll.namedtuple('Bipartition',['firstPart', 'secondPart', 'G']) def bipartition(G): nodes = frozenset(G.setOfNodes arcs = frozenset(G.setOfArcs) arcs = arcs.union( frozenset( {Arc(a.toNode,a.fromNode,a.datum) for a in G.setOfArcs} ) ) H = DiGraph(nodes, arcs) H_as_dict = digraph_to_dict(H) color = dict([]) some_node = next(iter(H.setOfNodes)) deq = coll.deque([some_node]) color[some_node] = -1 while len(deq) > 0: curr = deq.popleft() for a in H_as_dict.get(curr): if (a.toNode not in color): color[a.toNode] = -1*color[curr] deq.extend([a.toNode]) elif(color[curr] == color[a.toNode]): print(curr,a.toNode) raise Exception('Not Bipartite.') firstPart = frozenset( {n for n in color if color[n] == -1 } ) secondPart = frozenset( {n for n in color if color[n] == 1 } ) if( firstPart.union(secondPart) != G.setOfNodes ): raise Exception('Not Bipartite.') return Bipartition(firstPart, secondPart, G)匹配和最大匹配
假設我們有一個有向圖G並且matching是來自G.setOfArcs的弧的子集。 matching是匹配中任意兩個弧a和b的matching : len(frozenset( {a.fromNode} ).union( {a.toNode} ).union( {b.fromNode} ).union( {b.toNode} )) == 4 。 換句話說,匹配中沒有兩條弧共享一個節點。
如果G中沒有其他匹配的alt_matching使得len(matching) < len(alt_matching) ,則匹配matching是最大匹配。 換句話說, matching是最大匹配,如果它是來自G.setOfArcs的最大弧集,仍然滿足匹配的定義(添加任何不在匹配中的弧都會破壞匹配定義)。
如果 G.setOfArcs 中的每個節點n在G.setOfArcs a.fromNode == n or a.toNode == n的matching中存在一個弧a ,則最大匹配matching是完美匹配。
最大二分匹配
最大二分匹配是二分的有向圖G上的最大匹配。
給定G是二分的,尋找最大二分匹配的問題可以轉化為可用Edmonds-Karp算法求解的最大流量問題,然後最大二分匹配可以從最大流量問題的解中恢復。
讓bipartition是G的一個二分。
為此,我需要生成一個新的有向圖( H ),其中包含一些新節點( H.setOfNodes )和一些新弧( H.setOfArcs )。 H.setOfNodes包含G.setOfNodes中的所有節點以及另外兩個節點s (源節點)和t (終端節點)。
H.setOfArcs將為每個G.setOfArcs包含一個弧。 如果弧a在G.setOfArcs中, a.fromNode在bipartition.firstPart中, a.toNode在bipartition.secondPart中,則在H中包含a (添加FlowArcDatum(1,0) )。
如果a.fromNode在bipartition.secondPart中並且a.toNode在bipartition.firstPart中,則在 H.setOfArcs 中包含 Arc(a.toNode,a.fromNode, H.setOfArcs Arc(a.toNode,a.fromNode,FlowArcDatum(1,0)) 。
二分圖的定義確保沒有弧連接兩個節點位於同一分區中的任何節點。 H.setOfArcs還包含從節點s到bipartition.firstPart中每個節點的弧。 最後, H.setOfArcs包含bipartition.secondPart中的每個節點到節點t的弧。 對於 H.setOfArcs 中a所有H.setOfArcs a.datum.capacity = 1 。
首先將G.setOfNodes中的節點劃分為兩個不相交的集合( part1和part2 ),這樣G.setOfArcs中的任何弧都不會從一個集合指向同一個集合(此分區是可能的,因為G是二分的)。 接下來,將G.setOfArcs中從part1指向part2的所有弧添加到H.setOfArcs中。 然後創建單個源節點s和單個終端節點t並創建更多弧
然後,構造一個maxFlowProblemState 。
def solve_mbm( bipartition ): s = Node(uuid.uuid4(), FlowNodeDatum(0,0)) t = Node(uuid.uuid4(), FlowNodeDatum(0,0)) translate = {} arcs = frozenset([]) for a in bipartition.G.setOfArcs: if ( (a.fromNode in bipartition.firstPart) and (a.toNode in bipartition.secondPart) ): fark = Arc(a.fromNode,a.toNode,FlowArcDatum(1,0)) arcs = arcs.union({fark}) translate[frozenset({a.fromNode.uid,a.toNode.uid})] = a elif ( (a.toNode in bipartition.firstPart) and (a.fromNode in bipartition.secondPart) ): bark = Arc(a.toNode,a.fromNode,FlowArcDatum(1,0)) arcs = arcs.union({bark}) translate[frozenset({a.fromNode.uid,a.toNode.uid})] = a arcs1 = frozenset( {Arc(s,n,FlowArcDatum(1,0)) for n in bipartition.firstPart } ) arcs2 = frozenset( {Arc(n,t,FlowArcDatum(1,0)) for n in bipartition.secondPart } ) arcs = arcs.union(arcs1.union(arcs2)) nodes = frozenset( {Node(n.uid,FlowNodeDatum(0,0)) for n in bipartition.G.setOfNodes} ).union({s}).union({t}) G = DiGraph(nodes, arcs) mfp = MaxFlowProblemState(G, s.uid, t.uid, 'bipartite') result = edmonds_karp(mfp) lookup_set = {a for a in result.G.setOfArcs if (a.datum.flow > 0) and (a.fromNode.uid != s.uid) and (a.toNode.uid != t.uid)} matching = {translate[frozenset({a.fromNode.uid,a.toNode.uid})] for a in lookup_set} return matching最小節點覆蓋
有向圖G中的節點覆蓋是來自G.setOfNodes的一組節點( cover ),因此對於G.setOfArcs的任何弧a ,這必須為真: (a.fromNode in cover) or (a.toNode in cover) 。
最小節點覆蓋是圖中仍然是節點覆蓋的最小可能節點集。 Konig 定理指出,在二分圖中,該圖上最大匹配的大小等於最小節點覆蓋的大小,它表明節點覆蓋如何從最大匹配中恢復:
假設我們有bipartition bipartition和最大匹配matching 。 定義一個新的有向圖H , H.setOfNodes=G.setOfNodes , H.setOfArcs中的弧是兩個集合的並集。
第一組是matching的弧a ,如果a.fromNode in bipartition.firstPart的 a.fromNode 和a.toNode in bipartition.secondPart中的 a.toNode 則在創建的弧中交換a.fromNode和a.toNode給這樣的弧a.datum.inMatching=True屬性表示它們是從匹配的弧中派生的。
第二組是弧a NOT in matching ,如果a.fromNode in bipartition.secondPart和a.toNode in bipartition.firstPart則a.fromNode和a.toNode在創建的弧中交換(給這樣的弧a a.datum.inMatching=False屬性)。
接下來,從bipartition.firstPart中的每個節點n開始運行深度優先搜索(DFS),對於matching的任何弧a ,它既不是n == a.fromNode也不是n == a.toNodes 。 在 DFS 期間,訪問了一些節點,而一些沒有訪問(將此信息存儲在n.datum.visited字段中)。 最小節點覆蓋是節點{a.fromNode for a in H.setOfArcs if ( (a.datum.inMatching) and (a.fromNode.datum.visited) ) }和節點{a.fromNode for a in H.setOfArcs if (a.datum.inMatching) and (not a.toNode.datum.visited)} 。
這可以通過矛盾證明從最大匹配引導到最小節點覆蓋,取一些本應未被覆蓋的弧a並考慮關於a.fromNode和a.toNode是否屬於的所有四種情況(無論是toNode還是fromNode ) 到匹配matching中的任何弧。 由於DFS訪問節點的順序以及matching是最大匹配的事實,每種情況都會導致矛盾。
假設我們有一個函數來執行這些步驟,並在給定有向圖G和最大匹配matching時返回包含最小節點覆蓋的節點集:
ArcMatchingDatum = coll.namedtuple('ArcMatchingDatum', ['inMatching' ]) NodeMatchingDatum = coll.namedtuple('NodeMatchingDatum', ['visited']) def dfs_helper(snodes, G): sniter, do_stop = iter(snodes), False visited, visited_uids = set(), set() while(not do_stop): try: stack = [ next(sniter) ] while len(stack) > 0: curr = stack.pop() if curr.uid not in visited_uids: visited = visited.union( frozenset( {Node(curr.uid, NodeMatchingDatum(False))} ) ) visited_uids = visited.union(frozenset({curr.uid})) succ = frozenset({a.toNode for a in G.setOfArcs if a.fromNode == curr}) stack.extend( succ.difference( frozenset(visited) ) ) except StopIteration as e: stack, do_stop = [], True return visited def get_min_node_cover(matching, bipartition): nodes = frozenset( { Node(n.uid, NodeMatchingDatum(False)) for n in bipartition.G.setOfNodes} ) G = DiGraph(nodes, None) charcs = frozenset( {a for a in matching if ( (a.fromNode in bipartition.firstPart) and (a.toNode in bipartition.secondPart) )} ) arcs0 = frozenset( { Arc(find_node_by_uid(a.toNode.uid,G), find_node_by_uid(a.fromNode.uid,G), ArcMatchingDatum(True) ) for a in charcs } ) arcs1 = frozenset( { Arc(find_node_by_uid(a.fromNode.uid,G), find_node_by_uid(a.toNode.uid,G), ArcMatchingDatum(True) ) for a in matching.difference(charcs) } ) not_matching = bipartition.G.setOfArcs.difference( matching ) charcs = frozenset( {a for a in not_matching if ( (a.fromNode in bipartition.secondPart) and (a.toNode in bipartition.firstPart) )} ) arcs2 = frozenset( { Arc(find_node_by_uid(a.toNode.uid,G), find_node_by_uid(a.fromNode.uid,G), ArcMatchingDatum(False) ) for a in charcs } ) arcs3 = frozenset( { Arc(find_node_by_uid(a.fromNode.uid,G), find_node_by_uid(a.toNode.uid,G), ArcMatchingDatum(False) ) for a in not_matching.difference(charcs) } ) arcs = arcs0.union(arcs1.union(arcs2.union(arcs3))) G = DiGraph(nodes, arcs) bip = Bipartition({find_node_by_uid(n.uid,G) for n in bipartition.firstPart},{find_node_by_uid(n.uid,G) for n in bipartition.secondPart},G) match_from_nodes = frozenset({find_node_by_uid(a.fromNode.uid,G) for a in matching}) match_to_nodes = frozenset({find_node_by_uid(a.toNode.uid,G) for a in matching}) snodes = bip.firstPart.difference(match_from_nodes).difference(match_to_nodes) visited_nodes = dfs_helper(snodes, bip.G) not_visited_nodes = bip.G.setOfNodes.difference(visited_nodes) H = DiGraph(visited_nodes.union(not_visited_nodes), arcs) cover1 = frozenset( {a.fromNode for a in H.setOfArcs if ( (a.datum.inMatching) and (a.fromNode.datum.visited) ) } ) cover2 = frozenset( {a.fromNode for a in H.setOfArcs if ( (a.datum.inMatching) and (not a.toNode.datum.visited) ) } ) min_cover_nodes = cover1.union(cover2) true_min_cover_nodes = frozenset({find_node_by_uid(n.uid, bipartition.G) for n in min_cover_nodes}) return min_cover_nodes線性分配問題
線性分配問題包括在加權二分圖中找到最大權重匹配。
像本文開頭的問題可以表示為線性分配問題。 給定一組工人、一組任務和一個函數,該函數指示將一名工人分配給一項任務的盈利能力,我們希望最大化我們所做的所有分配的總和; 這是一個線性分配問題。
假設任務和工人的數量是相等的,儘管我會證明這個假設很容易消除。 在實現中,我用屬性a.datum.weight為弧a表示弧權重。
WeightArcDatum = namedtuple('WeightArcDatum', [weight])Kuhn-Munkres 算法
Kuhn-Munkres 算法解決了線性分配問題。 一個好的實現可能需要O(N^{4})時間,(其中N是表示問題的有向圖中的節點數)。 一個更容易解釋的實現需要O(N^{5}) (用於重新生成DiGraphs的版本)和O(N^{4})用於(用於維護DiGraphs的版本)。 這類似於Edmonds-Karp算法的兩種不同實現。
對於此描述,我只使用完整的二分圖(其中一半節點位於二分之一部分而另一半位於第二部分的那些)。 在工人中,任務動機意味著有與任務一樣多的工人。
這似乎是一個重要的條件(如果這些集合不相等怎麼辦!)但很容易解決這個問題; 我在最後一節中討論瞭如何做到這一點。
此處描述的算法版本使用了有用的零權重弧概念。 不幸的是,這個概念只有在我們解決最小化問題時才有意義(如果我們不是最大化我們的工人任務分配的利潤,而是最小化此類分配的成本)。
幸運的是,通過將每個弧a權重設置為Ma.datum.weight ,很容易將最大線性分配問題轉化為最小線性分配問題,其中M=max({a.datum.weight for a in G.setOfArcs}) . 原始最大化問題的解決方案將與弧權重更改後的解決方案最小化問題相同。 因此,對於其餘部分,假設我們進行了此更改。
Kuhn-Munkres 算法通過未加權二分圖中的一系列最大匹配來解決加權二分圖中的最小權重匹配。 如果我們在線性分配問題的有向圖表示上找到一個完美匹配,並且如果匹配中每條弧的權重為零,那麼我們找到了最小權重匹配,因為這個匹配表明有向圖中的所有節點都已經由具有最低可能成本的弧匹配(根據先前的定義,沒有成本可以低於 0)。
不能將其他弧添加到匹配中(因為所有節點都已匹配)並且不應從匹配中刪除任何弧,因為任何可能的替換弧將至少具有相同的權重值。
如果我們找到G的子圖的最大匹配,它只包含零權重弧,並且它不是完美匹配,我們沒有完整的解決方案(因為匹配不完美)。 但是,我們可以通過改變G.setOfArcs中弧的權重來生成一個新的有向圖H ,使得新的 0 權重弧出現,並且H的最優解與G的最優解相同。 由於我們保證在每次迭代中至少產生一個零權重弧,我們保證我們將在不超過|G.setOfNodes|^{2}=N^{2}次這樣的迭代中達到完美匹配。
假設在bipartition bipartition中, bipartition.firstPart包含代表worker 的節點, bipartition.secondPart代表代表task 的節點。
該算法首先生成一個新的有向圖H 。 H.setOfNodes = G.setOfNodes 。 H中的一些弧是從bipartition.firstPart中的節點n生成的。 每個這樣的節點n為bipartition.G.setOfArcs中的每個弧a在H.setOfArcs中生成一個弧b其中a.fromNode = n或a.toNode = n , b=Arc(a.fromNode, a.toNode, a.datum.weight - z)其中z=min(x.datum.weight for x in G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) )) 。
H中的更多弧是從bipartition.secondPart中的節點n生成的。 每個這樣的節點n為bipartition.G.setOfArcs中的每個弧a在H.setOfArcs中生成一個弧b其中a.fromNode = n或a.toNode = n , b=Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weight - z))其中z=min(x.datum.weight for x in G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) )) 。
KMA:接下來,形成一個新的有向圖K ,它僅由H的零權重弧和入射在這些弧上的節點組成。 在K中的節點上形成bipartition ,然後使用solve_mbm( bipartition )得到K上的最大匹配( matching )。 如果matching是H中的完美匹配( matching中的弧都發生在H.setOfNodes中的所有節點上),則matching是線性分配問題的最優解。
否則,如果matching不完美,則使用node_cover = get_min_node_cover(matching, bipartition(K))生成K的最小節點覆蓋。 接下來,定義z=min({a.datum.weight for a in H.setOfArcs if a not in node_cover}) 。 為 H.setOfArcs 中的 a 定義nodes = H.setOfNodes , arcs1 = {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh-z)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) and (a.toNode not in node_cover)} , arcs2 = {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) != (a.toNode not in node_cover)} , arcs3 = {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh+z)) for a in H.setOfArcs if ( (a.fromNode in node_cover) and (a.toNode in node_cover)} . 上一個表達式中的!=符號充當 XOR 運算符。然後arcs = arcs1.union(arcs2.union(arcs3)) 。接下來, H=DiGraph(nodes,arcs) 。返回標籤KMA 。算法一直持續到產生完美匹配為止。這種匹配也是線性分配問題的解決方案。
def kuhn_munkres( bipartition ): nodes = bipartition.G.setOfNodes arcs = frozenset({}) for n in bipartition.firstPart: z = min( {x.datum.weight for x in bipartition.G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) )} ) arcs = arcs.union( frozenset({Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weight - z)) }) ) for n in bipartition.secondPart: z = min( {x.datum.weight for x in bipartition.G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) )} ) arcs = arcs.union( frozenset({Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weight - z)) }) ) H = DiGraph(nodes, arcs) assignment, value = dict({}), 0 not_done = True while( not_done ): zwarcs = frozenset( {a for a in H.setOfArcs if a.datum.weight == 0} ) znodes = frozenset( {n.fromNode for n in zwarcs} ).union( frozenset( {n.toNode for n in zwarcs} ) ) K = DiGraph(znodes, zwarcs) k_bipartition = bipartition(K) matching = solve_mbm( k_bipartition ) mnodes = frozenset({a.fromNode for a in matching}).union(frozenset({a.toNode for a in matching})) if( len(mnodes) == len(H.setOfNodes) ): for a in matching: assignment[ a.fromNode.uid ] = a.toNode.uid value = sum({a.datum.weight for a in matching}) not_done = False else: node_cover = get_min_node_cover(matching, bipartition(K)) z = min( frozenset( {a.datum.weight for a in H.setOfArcs if a not in node_cover} ) ) nodes = H.setOfNodes arcs1 = frozenset( {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh-z)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) and (a.toNode not in node_cover)} ) arcs2 = frozenset( {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) != (a.toNode not in node_cover)} ) arcs3 = frozenset( {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh+z)) for a in H.setOfArcs if ( (a.fromNode in node_cover) and (a.toNode in node_cover)} ) arcs = arcs1.union(arcs2.union(arcs3)) H = DiGraph(nodes,arcs) return value, assignment 這個實現是O(N^{5}) ,因為它在每次迭代中生成一個新的最大匹配matching ; 與Edmonds-Karp的前兩個實現類似,可以修改此算法,以便跟踪匹配並智能地使其適應每次迭代。 完成後,複雜度變為O(N^{4}) 。 該算法的更高級和更新版本(需要一些更高級的數據結構)可以在O(N^{3})中運行。 上面更簡單的實現和更高級的實現的細節可以在這篇博文中找到。
弧權重上的任何操作都不會修改算法返回的最終分配。 原因如下:由於我們的輸入圖始終是完整的二分圖,因此解決方案必須通過這兩個節點之間的弧將一個分區中的每個節點映射到第二個分區中的另一個節點。 請注意,對弧權重執行的操作永遠不會改變在任何特定節點上發生的弧的順序(按權重排序)。
因此,當算法在完美的完全二分匹配處終止時,每個節點都被分配一個零權重弧,因為在算法期間來自該節點的弧的相對順序沒有改變,並且由於零權重弧是最便宜的可能弧,並且完美的完全二分匹配保證每個節點都存在一個這樣的弧。 這意味著生成的解決方案確實與原始線性分配問題的解決方案相同,而無需修改弧權重。
不平衡的分配
看起來該算法非常有限,因為如上所述,它僅在完整的二分圖上運行(其中一半節點位於二分之一部分,另一半位於第二部分)。 在工人中,任務動機意味著有與任務一樣多的工人(似乎非常有限)。
但是,有一個簡單的轉換可以消除此限制。 假設工人比任務少,我們添加一些虛擬工人(足以使結果圖成為完整的二分圖)。 Each dummy worker has an arc directed from the worker to each of the tasks. Each such arc has weight 0 (placing it in a matching gives no added profit). After this change the graph is a complete bipartite graph which we can solve for. Any task assigned a dummy worker is not initiated.
Suppose that there are more tasks than workers. We add some dummy tasks (enough to make the resulting graph a complete bipartite graph ). Each dummy task has an arc directed from each worker to the dummy task. Each such arc has a weight of 0 (placing it in a matching gives no added profit). After this change the graph is a complete bipartite graph which we can solve for. Any worker assigned to dummy task is not employed during the period.
A Linear Assignment Example
Finally, let's do an example with the code I've been using. I'm going to modify the example problem from here. We have 3 tasks: we need to clean the bathroom , sweep the floor , and wash the windows .
The workers available to use are Alice , Bob , Charlie , and Diane . Each of the workers gives us the wage they require per task. Here are the wages per worker:
| Clean the Bathroom | Sweep the Floor | Wash the Windows | |
|---|---|---|---|
| 愛麗絲 | 2美元 | 3 美元 | 3 美元 |
| 鮑勃 | 3 美元 | 2美元 | 3 美元 |
| 查理 | 3 美元 | 3 美元 | 2美元 |
| 黛安 | 9 美元 | 9 美元 | 1美元 |
If we want to pay the least amount of money, but still get all the tasks done, who should do what task? Start by introducing a dummy task to make the digraph representing the problem bipartite.
| Clean the Bathroom | Sweep the Floor | Wash the Windows | 沒做什麼 | |
|---|---|---|---|---|
| 愛麗絲 | 2美元 | 3 美元 | 3 美元 | $0 |
| 鮑勃 | 3 美元 | 2美元 | 3 美元 | $0 |
| 查理 | 3 美元 | 3 美元 | 2美元 | $0 |
| 黛安 | 9 美元 | 9 美元 | 1美元 | $0 |
Supposing that the problem is encoded in a digraph , then kuhn_munkres( bipartition(G) ) will solve the problem and return the assignment. It's easy to verify that the optimal (lowest cost) assignment will cost $5.
Here's a visualization of the solution the code above generates:
這就對了。 You now know everything you need to know about the linear assignment problem.
You can find all of the code from this article on GitHub.
進一步閱讀 Toptal 工程博客:
- 使用 Python/NetworkX 進行圖形數據科學
