最大流量和线性分配问题
已发表: 2022-03-11这里有一个问题:您的企业指派承包商来履行合同。 您查看您的名册并确定哪些承包商可用于一个月的参与,您查看您的可用合同以查看其中哪些是为期一个月的任务。 鉴于您知道每个承包商履行每份合同的效率如何,您如何分配承包商以最大限度地提高该月的整体效率?
这是分配问题的一个例子,这个问题可以用经典的匈牙利算法来解决。
匈牙利算法(也称为 Kuhn-Munkres 算法)是一种多项式时间算法,它在加权二分图中最大化权重匹配。 在这里,承包商和合同可以建模为二分图,它们的有效性是承包商和合同节点之间的边的权重。
在本文中,您将了解使用 Edmonds-Karp 算法解决线性分配问题的匈牙利算法的实现。 您还将了解 Edmonds-Karp 算法如何是对 Ford-Fulkerson 方法的轻微修改,以及这种修改的重要性。
最大流量问题
最大流量问题本身可以非正式地描述为将一些流体或气体通过管道网络从单一来源移动到单一终端的问题。 这是在假设网络中的压力足以确保流体或气体不会滞留在任何长度的管道或管道配件(不同长度管道相遇的地方)的情况下完成的。
通过对图进行某些更改,可以将分配问题转化为最大流问题。
预赛
解决这些问题所需的想法出现在许多数学和工程学科中,通常相似的概念有不同的名称并以不同的方式表示(例如,邻接矩阵和邻接列表)。 由于这些想法非常深奥,因此要选择如何为任何给定设置定义这些概念。
本文不会假设除了一些介绍性的集合论之外的任何先验知识。
这篇文章中的实现将问题表示为有向图(有向图)。
有向图
有向图有两个属性, setOfNodes和setOfArcs 。 这两个属性都是集合(无序集合)。 在这篇文章的代码块中,我实际上使用的是 Python 的 freezeset,但这个细节并不是特别重要。
DiGraph = namedtuple('DiGraph', ['setOfNodes','setOfArcs'])
(注意:本文以及本文中的所有其他代码都是用 Python 3.6 编写的。)
节点
一个节点n
由两个属性组成:
n.uid
:唯一标识符。这意味着对于任何两个节点
x
和y
,
x.uid != y.uid
-
n.datum
:这代表任何数据对象。
Node = namedtuple('Node', ['uid','datum'])
弧线
弧a
由三个属性组成:
a.fromNode
:这是一个node ,如上定义。a.toNode
:这是一个node ,如上定义。a.datum
:这代表任何数据对象。
Arc = namedtuple('Arc', ['fromNode','toNode','datum'])
有向图中的弧集表示有向图中节点上的二元关系。 arc a
的存在意味着a.fromNode
和a.toNode
之间存在关系。
在有向图中(相对于无向图), a.fromNode
和a.toNode
之间存在关系并不意味着a.toNode
和a.fromNode
之间存在类似的关系。
这是因为在无向图中,所表达的关系不一定是对称的。
有向图
节点和弧可用于定义有向图,但为方便起见,在下面的算法中,有向图将使用字典来表示。
这是一种可以将上面的图形表示转换为类似于此的字典表示的方法:
def digraph_to_dict(G): G_as_dict = dict([]) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) pdg(G) raise KeyError(err_msg) if(a.toNode not in G.setOfNodes): err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) pdg(G) raise KeyError(err_msg) G_as_dict[a.fromNode] = (G_as_dict[a.fromNode].union(frozenset([a]))) if (a.fromNode in G_as_dict) else frozenset([a]) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if a.toNode not in G_as_dict: G_as_dict[a.toNode] = frozenset([]) return G_as_dict
这是另一个将其转换为字典的字典,另一个有用的操作:
def digraph_to_double_dict(G): G_as_dict = dict([]) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if(a.toNode not in G.setOfNodes): err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if(a.fromNode not in G_as_dict): G_as_dict[a.fromNode] = dict({a.toNode : frozenset([a])}) else: if(a.toNode not in G_as_dict[a.fromNode]): G_as_dict[a.fromNode][a.toNode] = frozenset([a]) else: G_as_dict[a.fromNode][a.toNode] = G_as_dict[a.fromNode][a.toNode].union(frozenset([a])) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if a.toNode not in G_as_dict: G_as_dict[a.toNode] = dict({}) return G_as_dict
当文章谈到由字典表示的有向图时,它会提到G_as_dict
来引用它。
有时,通过它的uid
(唯一标识符)从有向图G
中获取节点会很有帮助:
def find_node_by_uid(find_uid, G): nodes = {n for n in G.setOfNodes if n.uid == find_uid} if(len(nodes) != 1): err_msg = 'Node with uid {find_uid!s} is not unique.'.format(**locals()) raise KeyError(err_msg) return nodes.pop()
在定义图时,人们有时使用术语节点和顶点来指代同一个概念。 术语arc和 edge 也是如此。
Python 中两种流行的图形表示是一种使用字典,另一种使用对象来表示图形。 这篇文章中的表示介于这两种常用表示之间。
这是我的有向图表示。 有很多喜欢的,但这个是我的。
步行和路径
令S_Arcs
是有向图G
中弧的有限序列(有序集合),使得如果a
是S_Arcs
中除最后一个以外的任何弧,并且b
在序列中跟随a
,则必须有一个节点n = a.fromNode
in G.setOfNodes
使得a.toNode = b.fromNode
。
从S_Arcs
中的第一个弧开始,一直持续到S_Arcs
中的最后一个弧,收集(按遇到的顺序)在S_Arcs
中每两个连续弧之间定义的所有节点n
。 标记在此操作期间收集的有序节点集合S_{Nodes}
。
def path_arcs_to_nodes(s_arcs): s_nodes = list([]) arc_it = iter(s_arcs) step_count = 0 last = None try: at_end = False last = a1 = next(arc_it) while (not at_end): s_nodes += [a1.fromNode] last = a2 = next(arc_it) step_count += 1 if(a1.toNode != a2.fromNode): err_msg = "Error at index {step_count!s} of Arc sequence.".format(**locals()) raise ValueError(err_msg) a1 = a2 except StopIteration as e: at_end = True if(last is not None): s_nodes += [last.toNode] return s_nodes
如果任何节点在序列
S_Nodes
中出现不止一次,则调用S_Arcs
a Walk on digraphG
。否则,在有向图
G
上调用S_Arcs
从list(S_Nodes)[0]
到list(S_Nodes)[-1]
的路径。
源节点
如果s
在G.setOfNodes
中并且G.setOfArcs
不包含满足a.toNode = s
弧a
,则将节点s
称为有向图G
中的源节点。
def source_nodes(G): to_nodes = frozenset({a.toNode for a in G.setOfArcs}) sources = G.setOfNodes.difference(to_nodes) return sources
终端节点
如果t
在G.setOfNodes
中并且G.setOfArcs
不包含满足a.fromNode = t
的弧a
,则将节点t
称为有向图G
中的终端节点。
def terminal_nodes(G): from_nodes = frozenset({a.fromNode for a in G.setOfArcs}) terminals = G.setOfNodes.difference(from_nodes) return terminals
削减和 st 削减
连通有向图G
的cut
是G.setOfArcs
的弧子集,它划分G
中的节点集G.setOfNodes
。 如果 G.setOfNodes 中的每个节点n
并且在G.setOfNodes
中至少有一个弧a
使得n = a.fromNode
G.setOfArcs
n = a.toNode
,但a.fromNode != a.toNode
,则G
是连接的。
Cut = namedtuple('Cut', ['setOfCutArcs'])
上面的定义指的是arcs的子集,但它也可以定义G.setOfNodes
的节点的分区。
对于函数predecessors_of
和successors_of
, n
是有向图G
的集合G.setOfNodes中的一个节点, cut
是G
的一个cut :
def cut_predecessors_of(n, cut, G): allowed_arcs = G.setOfArcs.difference(frozenset(cut.setOfCutArcs)) predecessors = frozenset({}) previous_count = len(predecessors) reach_fringe = frozenset({n}) keep_going = True while( keep_going ): reachable_from = frozenset({a.fromNode for a in allowed_arcs if (a.toNode in reach_fringe)}) reach_fringe = reachable_from predecessors = predecessors.union(reach_fringe) current_count = len(predecessors) keep_going = current_count - previous_count > 0 previous_count = current_count return predecessors def cut_successors_of(n, cut, G): allowed_arcs = G.setOfArcs.difference(frozenset(cut.setOfCutArcs)) successors = frozenset({}) previous_count = len(successors) reach_fringe = frozenset({n}) keep_going = True while( keep_going ): reachable_from = frozenset({a.toNode for a in allowed_arcs if (a.fromNode in reach_fringe)}) reach_fringe = reachable_from successors = successors.union(reach_fringe) current_count = len(successors) keep_going = current_count - previous_count > 0 previous_count = current_count return successors
令cut
为有向图G
的一个剪切。
def get_first_part(cut, G): firstPartFringe = frozenset({a.fromNode for a in cut.setOfCutArcs}) firstPart = firstPartFringe for n in firstPart: preds = cut_predecessors_of(n,cut,G) firstPart = firstPart.union(preds) return firstPart def get_second_part(cut, G): secondPartFringe = frozenset({a.toNode for a in cut.setOfCutArcs}) secondPart = secondPartFringe for n in secondPart: succs= cut_successors_of(n,cut,G) secondPart = secondPart.union(succs) return secondPart
cut
是有向图G
的一个切如果: (get_first_part(cut, G).union(get_second_part(cut, G)) == G.setOfNodes) and (len(get_first_part(cut, G).intersect(get_second_part(cut, G))) == 0)
如果(x in get_first_part(cut, G)) and (y in get_second_part(cut, G) ) and (x != y)
则cut
称为xy 切割。 当xy 割cut
中的节点x
是源节点且xy割割中的节点y
是终端节点时,则该割称为st 割。
STCut = namedtuple('STCut', ['s','t','cut'])
流网络
您可以使用有向图G
来表示流网络。
分配每个节点n
,其中n
在G.setOfNodes
中是一个n.datum
,它是一个FlowNodeDatum
:
FlowNodeDatum = namedtuple('FlowNodeDatum', ['flowIn','flowOut'])
分配每个弧a
,其中a
在G.setOfArcs
中, a.datum
是FlowArcDatum
。
FlowArcDatum = namedtuple('FlowArcDatum', ['capacity','flow'])
flowNodeDatum.flowIn
和flowNodeDatum.flowOut
是正实数。
flowArcDatum.capacity
和flowArcDatum.flow
也是正实数。
对于G.setOfNodes
中的每个节点节点n
:
n.datum.flowIn = sum({a.datum.flow for a in G.Arcs if a.toNode == n}) n.datum.flowOut = sum({a.datum.flow for a in G.Arcs if a.fromNode == n})
Digraph G
现在代表一个流网络。
G
的流是指G
中所有弧a
的a.flow
。
可行的流程
让有向图G
代表一个流网络。
如果满足以下条件,则由G
表示的流网络具有可行流:
对于
G.setOfNodes
中的每个节点n
,源节点和终端节点除外:n.datum.flowIn = n.datum.flowOut
。对于
G.setOfNodes
中的每个弧a
:a.datum.capacity <= a.datum.flow
。
条件 1 称为守恒约束。
条件 2 称为容量约束。
切割能力
由有向图G
表示的流网络的源节点s
和终端节点t
的st 割stCut
的割容量为:
def cut_capacity(stCut, G): part_1 = get_first_part(stCut.cut,G) part_2 = get_second_part(stCut.cut,G) s_part = part_1 if stCut.s in part_1 else part_2 t_part = part_1 if stCut.t in part_1 else part_2 cut_capacity = sum({a.datum.capacity for a in stCut.cut.setOfCutArcs if ( (a.fromNode in s_part) and (a.toNode in t_part) )}) return cut_capacity
最小产能削减
令stCut = stCut(s,t,cut)
是由有向图G
表示的流网络的st 割。
stCut
是由G
表示的流网络的最小容量割,如果在该流网络中没有其他st 割stCutCompetitor
使得:
cut_capacity(stCut, G) < cut_capacity(stCutCompetitor, G)
剥离流动
我想指的是有向图,它是获取有向图G
并从 G.setOfNodes 中的所有节点以及G.setOfNodes
中的所有弧中剥离所有流数据的G.setOfArcs
。
def strip_flows(G): new_nodes= frozenset( (Node(n.uid, FlowNodeDatum(0.0,0.0)) for n in G.setOfNodes) ) new_arcs = frozenset([]) for a in G.setOfArcs: new_fromNode = Node(a.fromNode.uid, FlowNodeDatum(0.0,0.0)) new_toNode = Node(a.toNode.uid, FlowNodeDatum(0.0,0.0)) new_arc = Arc(new_fromNode, new_toNode, FlowArcDatum(a.datum.capacity, 0.0)) new_arcs = new_arcs.union(frozenset([new_arc])) return DiGraph(new_nodes, new_arcs)
最大流量问题
表示为有向图G
的流网络,G.setOfNodes 中的源节点s
和G.setOfNodes
中的终端节点t
,如果G.setOfNodes
, G
可以表示最大流问题:
(len(list(source_nodes(G))) == 1) and (next(iter(source_nodes(G))) == s) and (len(list(terminal_nodes(G))) == 1) and (next(iter(terminal_nodes(G))) == t)
标记此表示:
MaxFlowProblemState = namedtuple('MaxFlowProblemState', ['G','sourceNodeUid','terminalNodeUid','maxFlowProblemStateUid'])
其中sourceNodeUid = s.uid
、 terminalNodeUid = t.uid
和maxFlowProblemStateUid
是问题实例的标识符。
最大流量解决方案
让maxFlowProblem
代表一个最大流量问题。 maxFlowProblem
的解决方案可以由表示为有向图H
的流网络表示。
Digraph H
是输入python maxFlowProblem
上的最大流量问题的可行解决方案,如果:
strip_flows(maxFlowProblem.G) == strip_flows(H)
。H
是一个流网络,具有可行流。
如果除了1和2:
- 不可能有由有向图
K
表示的其他流网络,这样strip_flows(G) == strip_flows(K)
和find_node_by_uid(t.uid,G).flowIn < find_node_by_uid(t.uid,K).flowIn
。
那么H
也是maxFlowProblem
的最优解。
换句话说,可行的最大流量解决方案可以用有向图表示,其中:
与对应的最大流量问题的有向图
G
相同,但任何节点和弧的n.datum.flowIn
、n.datum.flowOut
和a.datum.flow
可能不同。表示具有可行流的流网络。
并且,如果另外,它可以代表最佳的最大流量解决方案:
- 最大流问题中终端节点对应的节点的
flowIn
尽可能大(条件1和2仍然满足时)。
如果有向图H
表示可行的最大流量解决方案: find_node_by_uid(s.uid,H).flowOut = find_node_by_uid(t.uid,H).flowIn
这遵循最大流量,最小切割定理(下面讨论)。 非正式地,由于假设H
具有可行的流,这意味着流在穿过任何(其他)节点(守恒约束)时既不能“创建”(源节点s
除外)也不能“销毁”(终端节点t
除外)。
由于最大流量问题仅包含单个源节点s
和单个终端节点t
,因此在s
处“创建”的所有流必须在t
处“销毁”,否则流网络没有可行的流(将违反守恒约束) )。
设有向图H
表示可行的最大流量解; 上面的值称为H
的st 流值。
让:
mfps=MaxFlowProblemState(H, maxFlowProblem.sourceNodeUid, maxFlowProblem.terminalNodeUid, maxFlowProblem.maxFlowProblemStateUid)
这意味着mfps
是maxFlowProblem
的后继状态,这仅意味着mfps
与 maxFlowProblem 完全一样,只是maxFlowProblem
中弧a.flow
a.flow
弧a
a
maxFlowProblem.setOfArcs
不同mfps.setOfArcs
。
def get_mfps_flow(mfps): flow_from_s = find_node_by_uid(mfps.sourceNodeUid,mfps.G).datum.flowOut flow_to_t = find_node_by_uid(mfps.terminalNodeUid,mfps.G).datum.flowIn if( (flow_to_t - flow_from_s) > 0): raise Exception('Infeasible st flow') return flow_to_t
这是mfps
及其关联的maxFlowProb
的可视化。 图像中的每条弧a
都有一个标签,这些标签是a.datum.flowFrom / a.datum.flowTo
,图像中的每个节点n
都有一个标签,这些标签是n.uid {n.datum.flowIn / a.datum.flowOut}
。
st 截流
让mfps
代表MaxFlowProblemState
,让stCut
代表mfps.G的一个mfps.G
。 stCut
的切割流定义为:
def get_stcut_flow(stCut,mfps): s = find_node_by_uid(mfps.sourceNodeUid, mfps.G) t = find_node_by_uid(mfps.terminalNodeUid, mfps.G) part_1 = get_first_part(stCut.cut,mfps.G) part_2 = get_second_part(stCut.cut,mfps.G) s_part = part_1 if s in part_1 else part_2 t_part = part_1 if t in part_1 else part_2 s_t_sum = sum(a.datum.flow for a in mfps.G.setOfArcs if (a in stCut.cut.setOfCutArcs) and (a.fromNode in s_part) ) t_s_sum = sum(a.datum.flow for a in mfps.G.setOfArcs if (a in stCut.cut.setOfCutArcs) and (a.fromNode in t_part) ) cut_flow = s_t_sum - t_s_sum return cut_flow
st cut flow是从包含源节点的分区到包含终端节点的分区的流量总和减去从包含终端节点的分区到包含源节点的分区的流量总和。
最大流量,最小切割
让maxFlowProblem
表示最大流量问题,并让maxFlowProblem
的解决方案由表示为有向图H
的流网络表示。
令minStCut
为maxFlowProblem.G
表示的流网络的最小容量切割。
因为在最大流问题中,流仅起源于单个源节点并终止于单个终端节点,并且由于容量约束和守恒约束,我们知道进入maxFlowProblem.terminalNodeUid
的所有流必须穿过任何st cut ,特别是它必须跨越minStCut
。 这意味着:
get_flow_value(H, maxFlowProblem) <= cut_capacity(minStCut, maxFlowProblem.G)
解决最大流量问题
解决最大流量问题maxFlowProblem
的基本思想是从有向图H
表示的最大流量解开始。 这样的起点可以是H = strip_flows(maxFlowProblem.G)
。 然后的任务是使用H
并通过对H.setOfArcs
中某些弧a
的a.datum.flow
值的一些贪婪修改来产生由有向图K
表示的另一个最大流量解决方案,这样K
仍然不能表示具有可行流的流网络和get_flow_value(H, maxFlowProblem) < get_flow_value(K, maxFlowProblem)
。 只要这个过程继续下去,最近遇到的最大流量解决方案( K
) 的质量 ( get_flow_value(K, maxFlowProblem)
) 就优于已找到的任何其他最大流量解决方案。 如果该过程达到了它知道不可能有其他改进的程度,则该过程可以终止并且它将返回最优最大流量解决方案。
上面的描述是一般性的,并跳过了许多证明,例如这样的过程是否可能或可能需要多长时间,我将提供更多细节和算法。
最大流量,最小割定理
从 Ford 和 Fulkerson 所著的 Flows in Networks 一书中,最大流最小割定理(定理 5.1)的陈述是:
对于任何网络,从
s
到t
的最大流量值等于分离s
和t
的所有切割的最小切割容量。
使用这篇文章中的定义,可以转化为:
由表示为有向图H
的流网络表示的maxFlowProblem
的解决方案是最优的,如果:
get_flow_value(H, maxFlowProblem) == cut_capacity(minStCut, maxFlowProblem.G)
我喜欢这个定理证明,维基百科有另一个证明。
最大流最小割定理用于证明Ford-Fulkerson方法的正确性和完备性。
我还将在扩充路径后的部分中给出定理的证明。
Ford-Fulkerson 方法和 Edmonds-Karp 算法
CLRS 定义了 Ford-Fulkerson 方法,如下所示(第 26.2 节):
FORD-FULKERSON-METHOD(G, s, t): initialize flow f to 0 while there exists an augmenting path p in the residual graph G_f augment flow f along
残差图
表示为有向图G
的流网络的残差图可以表示为有向图G_f
:
ResidualDatum = namedtuple('ResidualDatum', ['residualCapacity','action']) def agg_n_to_u_cap(n,u,G_as_dict): arcs_out = G_as_dict[n] return sum([a.datum.capacity for a in arcs_out if( (a.fromNode == n) and (a.toNode == u) ) ]) def agg_n_to_u_flow(n,u,G_as_dict): arcs_out = G_as_dict[n] return sum([a.datum.flow for a in arcs_out if( (a.fromNode == n) and (a.toNode == u) ) ]) def get_residual_graph_of(G): G_as_dict = digraph_to_dict(G) residual_nodes = G.setOfNodes residual_arcs = frozenset([]) for n in G_as_dict: arcs_from = G_as_dict[n] nodes_to = frozenset([find_node_by_uid(a.toNode.uid,G) for a in arcs_from]) for u in nodes_to: n_to_u_cap_sum = agg_n_to_u_cap(n,u,G_as_dict) n_to_u_flow_sum = agg_n_to_u_flow(n,u,G_as_dict) if(n_to_u_cap_sum > n_to_u_flow_sum): flow = round(n_to_u_cap_sum - n_to_u_flow_sum, TOL) residual_arcs = residual_arcs.union( frozenset([Arc(n,u,datum=ResidualDatum(flow, 'push'))]) ) if(n_to_u_flow_sum > 0.0): flow = round(n_to_u_flow_sum, TOL) residual_arcs = residual_arcs.union( frozenset([Arc(u,n,datum=ResidualDatum(flow, 'pull'))]) ) return DiGraph(residual_nodes, residual_arcs)
agg_n_to_u_cap(n,u,G_as_dict)
返回 G.setOfArcs 子集中所有弧的a.datum.capacity
的总和,如果G.setOfArcs
a.fromNode = n
和a.toNode = u
,则弧a
在子集中。agg_n_to_u_cap(n,u,G_as_dict)
返回 G.setOfArcs 子集中所有弧的a.datum.flow
的总和,如果G.setOfArcs
a.fromNode = n
和a.toNode = u
,弧a
在子集中。
简而言之,残差图G_f
表示可以在有向图G
上执行的某些动作。
有向图G
表示的流网络G.setOfNodes
中的每一对节点n,u
可以在G
的残差图G_f
中生成 0、1 或 2 条弧。
如果在
G.setOfArcs
中没有弧a
使得a.fromNode = n
和a.toNode = u
,则对n,u
不会在G_f
中生成任何弧。对
n,u
在G_f.setOfArcs
中生成弧a
,其中a
表示标记为从n
到u
的推流弧aa = Arc(n,u,datum=ResidualNode(n_to_u_cap_sum - n_to_u_flow_sum))
如果n_to_u_cap_sum > n_to_u_flow_sum
。对
n,u
在G_f.setOfArcs
中生成弧a
,其中a
表示标记为从n
到u
的拉流弧的弧a = Arc(n,u,datum=ResidualNode(n_to_u_cap_sum - n_to_u_flow_sum))
如果n_to_u_flow_sum > 0.0
。
G_f.setOfArcs
中的每个推流弧表示将总共x <= n_to_u_cap_sum - n_to_u_flow_sum
流添加到 G.setOfArcs 子集中的弧的G.setOfArcs
,其中弧a
在子集中,如果a.fromNode = n
和a.toNode = u
。G_f.setOfArcs
中的每个拉流弧表示将总计x <= n_to_u_flow_sum
流减去 G.setOfArcs 子集中的弧的G.setOfArcs
,其中如果a.fromNode = n
和a.toNode = u
,弧a
在子集中。
在G
中的适用弧上从G_f
执行单个推或拉动作可能会生成一个没有可行流的流网络,因为在生成的流网络中可能违反容量约束或守恒约束。
这是上一个最大流量解决方案可视化示例的残差图的可视化,在可视化中,每条弧a
代表a.residualCapacity
。
增强路径
令maxFlowProblem
为最大流量问题,令G_f = get_residual_graph_of(G)
为maxFlowProblem.G
的残差图。
maxFlowProblem
的增广路径augmentingPath
是从find_node_by_uid(maxFlowProblem.sourceNode,G_f)
到find_node_by_uid(maxFlowProblem.terminalNode,G_f)
的任何路径。
事实证明,增广路径augmentingPath
可以应用于由有向图H
表示的最大流量解决方案,生成由有向图K
表示的另一个最大流量解决方案,其中get_flow_value(H, maxFlowProblem) < get_flow_value(K, maxFlowProblem)
如果H
不是最优的。
这是如何做:
def augment(augmentingPath, H): augmentingPath = list(augmentingPath) H_as_dict = digraph_to_dict(H) new_nodes = frozenset({}) new_arcs = frozenset({}) visited_nodes = frozenset({}) visited_arcs = frozenset({}) bottleneck_residualCapacity = min( augmentingPath, key=lambda a: a.datum.residualCapacity ).datum.residualCapacity for x in augmentingPath: from_node_uid = x.fromNode.uid if x.datum.action == 'push' else x.toNode.uid to_node_uid = x.toNode.uid if x.datum.action == 'push' else x.fromNode.uid from_node = find_node_by_uid( from_node_uid, H ) to_node = find_node_by_uid( to_node_uid, H ) load = bottleneck_residualCapacity if x.datum.action == 'push' else -bottleneck_residualCapacity for a in H_as_dict[from_node]: if(a.toNode == to_node): test_sum = a.datum.flow + load new_flow = a.datum.flow new_from_node_flow_out = a.fromNode.datum.flowOut new_to_node_flow_in = a.toNode.datum.flowIn new_from_look = {n for n in new_nodes if (n.uid == a.fromNode.uid)} new_to_look = {n for n in new_nodes if (n.uid == a.toNode.uid) } prev_from_node = new_from_look.pop() if (len(new_from_look)>0) else a.fromNode prev_to_node = new_to_look.pop() if (len(new_to_look)>0) else a.toNode new_nodes = new_nodes.difference(frozenset({prev_from_node})) new_nodes = new_nodes.difference(frozenset({prev_to_node})) if(test_sum > a.datum.capacity): new_flow = a.datum.capacity new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow + a.datum.capacity new_to_node_flow_in = prev_to_node.datum.flowIn - a.datum.flow + a.datum.capacity load = test_sum - a.datum.capacity elif(test_sum < 0.0): new_flow = 0.0 new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow new_to_node_flow_in = prev_to_node.datum.flowIn - a.datum.flow load = test_sum else: new_flow = test_sum new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow + new_flow new_to_node_flow_in = prev_to_node.datum.flowIn - a.datum.flow + new_flow load = 0.0 new_from_node_flow_out = round(new_from_node_flow_out, TOL) new_to_node_flow_in = round(new_to_node_flow_in, TOL) new_flow = round(new_flow, TOL) new_from_node = Node(prev_from_node.uid, FlowNodeDatum(prev_from_node.datum.flowIn, new_from_node_flow_out)) new_to_node = Node(prev_to_node.uid, FlowNodeDatum(new_to_node_flow_in, prev_to_node.datum.flowOut)) new_arc = Arc(new_from_node, new_to_node, FlowArcDatum(a.datum.capacity, new_flow)) visited_nodes = visited_nodes.union(frozenset({a.fromNode,a.toNode})) visited_arcs = visited_arcs.union(frozenset({a})) new_nodes = new_nodes.union(frozenset({new_from_node, new_to_node})) new_arcs = new_arcs.union(frozenset({new_arc})) not_visited_nodes = H.setOfNodes.difference(visited_nodes) not_visited_arcs = H.setOfArcs.difference(visited_arcs) full_new_nodes = new_nodes.union(not_visited_nodes) full_new_arcs = new_arcs.union(not_visited_arcs) G = DiGraph(full_new_nodes, full_new_arcs) full_new_arcs_update = frozenset([]) for a in full_new_arcs: new_from_node = a.fromNode new_to_node = a.toNode new_from_node = find_node_by_uid( a.fromNode.uid, G ) new_to_node = find_node_by_uid( a.toNode.uid, G ) full_new_arcs_update = full_new_arcs_update.union( {Arc(new_from_node, new_to_node, FlowArcDatum(a.datum.capacity, a.datum.flow))} ) G = DiGraph(full_new_nodes, full_new_arcs_update) return G
在上面, TOL
是对网络中的流量值进行四舍五入的一些容差值。 这是为了避免浮点计算的级联不精确。 因此,例如,我使用TOL = 10
来表示四舍五入到 10 位有效数字。
令K = augment(augmentingPath, H)
,则K
表示maxFlowProblem
的可行最大流量解决方案。 要使陈述成立,由K
表示的流网络必须具有可行的流(不违反容量约束或守恒约束。
原因如下:在上述方法中,添加到由有向图K
表示的新流网络的每个节点要么是有向图H
中节点的精确副本,要么是在其n.datum.flowIn
中添加了相同编号的节点n
它的n.datum.flowOut
。 这意味着只要在H
中满足守恒约束,就在K
中满足守恒约束。 守恒约束得到满足,因为我们明确检查网络中的任何新弧a
是否具有a.datum.flow <= a.datum.capacity
; 因此,只要未修改地复制到K.setOfArcs
中的集合H.setOfArcs
中的弧不违反容量约束,则K
不违反容量约束。
如果H
不是最优的, get_flow_value(H, maxFlowProblem) < get_flow_value(K, maxFlowProblem)
也是如此。
原因如下:对于最大流问题maxFlowProblem
的残差图G_f
的有向图表示中存在的增广路径augmentingPath
,那么增广augmentingPath
上的最后一条弧a
必须是“推”弧,并且它必须具有a.toNode == maxFlowProblem.terminalNode
。 增广路径定义为终止于最大流量问题的终端节点的增广路径。 根据残差图的定义,很明显,残差图上的增广路径中的最后一条弧必须是“推”弧,因为增广路径中的任何“拉”弧b
都将具有b.toNode == maxFlowProblem.terminalNode
和b.fromNode != maxFlowProblem.terminalNode
来自path的定义。 另外,从path的定义来看,很明显终端节点只被augment
方法修改过一次。 因此, augment
只修改了一次maxFlowProblem.terminalNode.flowIn
并增加了maxFlowProblem.terminalNode.flowIn
的值,因为augmentingPath
中的最后一条弧必须是导致在augment
期间修改maxFlowProblem.terminalNode.flowIn
的弧。 从适用于“推”弧的augment
的定义来看, maxFlowProblem.terminalNode.flowIn
只能增加,不能减少。
来自 Sedgewick 和 Wayne 的一些证明
Robert Sedgewich 和 Kevin Wayne 所著的 Algorithms,第四版这本书有一些非常有用的简短证明(第 892-894 页)。 我将在这里重新创建它们,尽管我将使用适合以前定义的语言。 我的校样标签与 Sedgewick 书中的标签相同。
命题 E:对于表示最大流量问题maxFlowProblem
的可行最大流量解的任何有向图H
,对于任何stCut
get_stcut_flow(stCut,H,maxFlowProblem) = get_flow_value(H, maxFlowProblem)
。
证明:令stCut=stCut(maxFlowProblem.sourceNode,maxFlowProblem.terminalNode,set([a for a in H.setOfArcs if a.toNode == maxFlowProblem.terminalNode]))
。 命题 E对stCut
直接从st flow value的定义成立。 假设我们希望将某个节点n
从 s-partition ( get_first_part(stCut.cut, G)
) 移动到 t-partition (get_second_part(stCut.cut, G))
中,为此我们需要更改stCut.cut
,这可能会改变stcut_flow = get_stcut_flow(stCut,H,maxFlowProblem)
并使命题 E无效。 但是,让我们看看stcut_flow
的值在我们进行更改时将如何变化。 节点n
处于平衡状态,这意味着流入节点n
的流量总和等于流出节点 n 的流量总和(这是H
表示可行解所必需的)。 请注意,作为stcut_flow
的一部分进入节点n
的所有流都从 s 分区进入(从 t 分区直接或间接进入节点n
的流不会被计入stcut_flow
值,因为它的方向是错误的根据定义)。 此外,所有离开n
的流最终将(直接或间接)流入终端节点(如前所述)。 当我们将节点n
移动到 t-partition 中时,所有从 s-partition 进入n
的流必须添加到新的stcut_flow
值; 但是,必须从新的stcut_flow
值中减去所有退出n
的流; 直接进入 t-partition 的流部分被减去,因为该流现在位于新 t-partition 的内部并且不计为stcut_flow
。 从n
流向 s-partition 中的节点的部分流也必须从stcut_flow
中减去:在n
移动到 t-partition 之后,这些流将从 t-partition 引导到 s-partition 等等不能在stcut_flow
中考虑,因为这些流被删除,流入 s-partition 并且必须减去这些流的总和,以及从 s-partition 到 t-partition 的流出(其中所有流量来自st 必须结束)必须减少相同的数量。 由于节点n
在该过程之前处于平衡状态,因此更新将在新的stcut_flow
值中添加相同的值,因为它减去了因此在更新之后使命题 E为真。 然后,命题 E的有效性来自对 t 分区大小的归纳。
以下是一些示例流网络,以帮助可视化命题 E成立的不太明显的情况; 图中,红色区域表示 s-partition,蓝色区域表示 t-partition,绿色弧线表示st cut 。 在第二幅图中,节点A 和节点B 之间的流量增加,而流入终端节点t 的流量没有变化。:

推论:任何st cut 的流量值都不能超过任何st cut的容量。
命题 F.(最大流,最小割定理):设f
为st 流。 以下3个条件是等价的:
存在一个容量等于流
f
的值的st 割。f
是最大流量。没有关于
f
的增广路径。
条件 1 由推论暗示条件 2。 条件 2 意味着条件 3,因为增广路径的存在意味着存在具有较大值的流,这与f
的最大值相矛盾。 条件 3 意味着条件 1:让C_s
是可以从s
到达的所有节点的集合,在残差图中具有增广路径。 让C_t
是剩余的arcs ,那么t
必须在C_t
中(根据我们的假设)。 从C_s
到C_t
的弧然后形成一个st 切割,其中仅包含弧a
其中a.datum.capacity = a.datum.flow
或a.datum.flow = 0.0
。 如果不是这样,那么由具有剩余剩余容量的弧连接到C_s
的节点将在集合C_s
中,因为从s
到这样的节点会有一条增广路径。 通过st 割的流量等于st 割的容量(因为从C_s
到C_t
的弧的流量等于容量),也等于st 流量的值(根据命题 E )。
最大流最小割定理的这个陈述暗示了网络中的流的早期陈述。
推论(整数属性):当容量为整数时,存在一个整数值的最大流,Ford-Fulkerson 算法找到它。
证明:每条增广路径将流量增加一个正整数,即“推”弧和“拉”弧中未使用容量的最小值,所有这些都是正整数。
这证明了来自CLRS的Ford-Fulkerson 方法描述。 方法是不断寻找增广路径,并maxFlowSolution
augment
更好的解决方案,直到没有更多的增广路径意味着最新的最大流量解决方案是最优的。
从福特-富尔克森到埃德蒙兹-卡普
关于解决最大流量问题的其余问题是:
应该如何构建增广路径?
如果我们使用实数而不是整数,该方法会终止吗?
终止(如果终止)需要多长时间?
Edmonds-Karp 算法指定每个增广路径由残差图的广度优先搜索(BFS)构造; 事实证明,上述第 1 点的决定也将迫使算法终止(第 2 点)并允许确定渐近时间和空间复杂度。
首先,这是一个BFS实现:
def bfs(sourceNode, terminalNode, G_f): G_f_as_dict = digraph_to_dict(G_f) parent_arcs = dict([]) visited = frozenset([]) deq = deque([sourceNode]) while len(deq) > 0: curr = deq.popleft() if curr == terminalNode: break for a in G_f_as_dict.get(curr): if (a.toNode not in visited): visited = visited.union(frozenset([a.toNode])) parent_arcs[a.toNode] = a deq.extend([a.toNode]) path = list([]) curr = terminalNode while(curr != sourceNode): if (curr not in parent_arcs): err_msg = 'No augmenting path from {} to {}.'.format(sourceNode.uid, terminalNode.uid) raise StopIteration(err_msg) path.extend([parent_arcs[curr]]) curr = parent_arcs[curr].fromNode path.reverse() test = deque([path[0].fromNode]) for a in path: if(test[-1] != a.fromNode): err_msg = 'Bad path: {}'.format(path) raise ValueError(err_msg) test.extend([a.toNode]) return path
我使用了来自 python 集合模块的双端队列。
为了回答上面的问题 2,我将解释 Sedgewick 和 Wayne 的另一个证明:命题 G。具有N
个节点和A
弧的Edmonds-Karp算法中所需的增广路径数最多为NA/2
。 证明:每条增广路径都有一条瓶颈弧——一条从残差图中被删除的弧,因为它对应于一个“推”弧,该弧对应于一个填充到容量的“推”弧,或者一个“拉”弧,通过该弧,流量变为 0。每次arc成为瓶颈arc ,通过它的任何增广路径的长度必须增加 2 倍。这是因为路径中的每个节点可能只出现一次或根本不出现(根据path的定义),因为路径正在从最短路径到最长路径进行探索,这意味着通过特定瓶颈节点的下一条路径必须访问至少一个节点,这意味着在我们到达节点之前路径上还有另外 2 个弧。 由于增广路径的长度最多为N
每条弧最多可以在N/2
增广路径上,并且增广路径的总数最多为NA/2
。
Edmonds-Karp 算法在O(NA^2)
中执行。 如果在算法期间最多探索NA/2
条路径,并且使用BFS探索每条路径是N+A
则乘积的最重要项,因此渐近复杂度为O(NA^2)
。
让mfp
成为maxFlowProblemState
。
def edmonds_karp(mfp): sid, tid = mfp.sourceNodeUid, mfp.terminalNodeUid no_more_paths = False loop_count = 0 while(not no_more_paths): residual_digraph = get_residual_graph_of(mfp.G) try: asource = find_node_by_uid(mfp.sourceNodeUid, residual_digraph) aterm = find_node_by_uid(mfp.terminalNodeUid, residual_digraph) apath = bfs(asource, aterm, residual_digraph) paint_mfp_path(mfp, loop_count, apath) G = augment(apath, mfp.G) s = find_node_by_uid(sid, G) t = find_node_by_uid(tid, G) mfp = MaxFlowProblemState(G, s.uid, t.uid, mfp.maxFlowProblemStateUid) loop_count += 1 except StopIteration as e: no_more_paths = True return mfp
上面的版本效率低下,并且比O(NA^2)
复杂度更差,因为它每次都构造一个新的最大流量解决方案和一个新的残差图(而不是随着算法的进步而修改现有的有向图)。 为了获得真正的O(NA^2)
解决方案,算法必须同时维护表示最大流问题状态的有向图及其相关的残差图。 因此算法必须避免不必要地迭代弧和节点,并仅在必要时更新它们的值和残差图中的关联值。
为了编写更快的Edmonds Karp算法,我重写了上面的几段代码。 我希望通过生成新有向图的代码有助于理解正在发生的事情。 在快速算法中,我使用了一些我不想详细介绍的新技巧和 Python 数据结构。 我会说a.fromNode
和a.toNode
现在被视为字符串和节点的 uid。 对于此代码,让mfps
为maxFlowProblemState
import uuid def fast_get_mfps_flow(mfps): flow_from_s = {n for n in mfps.G.setOfNodes if n.uid == mfps.sourceNodeUid}.pop().datum.flowOut flow_to_t = {n for n in mfps.G.setOfNodes if n.uid == mfps.terminalNodeUid}.pop().datum.flowIn if( (flow_to_t - flow_from_s) > 0.00): raise Exception('Infeasible st flow') return flow_to_t def fast_e_k_preprocess(G): G = strip_flows(G) get = dict({}) get['nodes'] = dict({}) get['node_to_node_capacity'] = dict({}) get['node_to_node_flow'] = dict({}) get['arcs'] = dict({}) get['residual_arcs'] = dict({}) for a in G.setOfArcs: if(a.fromNode not in G.setOfNodes): err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) if(a.toNode not in G.setOfNodes): err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals()) raise KeyError(err_msg) get['nodes'][a.fromNode.uid] = a.fromNode get['nodes'][a.toNode.uid] = a.toNode lark = Arc(a.fromNode.uid, a.toNode.uid, FlowArcDatumWithUid(a.datum.capacity, a.datum.flow, uuid.uuid4())) if(a.fromNode.uid not in get['arcs']): get['arcs'][a.fromNode.uid] = dict({a.toNode.uid : dict({lark.datum.uid : lark})}) else: if(a.toNode.uid not in get['arcs'][a.fromNode.uid]): get['arcs'][a.fromNode.uid][a.toNode.uid] = dict({lark.datum.uid : lark}) else: get['arcs'][a.fromNode.uid][a.toNode.uid][lark.datum.uid] = lark for a in G.setOfArcs: if a.toNode.uid not in get['arcs']: get['arcs'][a.toNode.uid] = dict({}) for n in get['nodes']: get['residual_arcs'][n] = dict() get['node_to_node_capacity'][n] = dict() get['node_to_node_flow'][n] = dict() for u in get['nodes']: n_to_u_cap_sum = sum(a.datum.capacity for a in G.setOfArcs if (a.fromNode.uid == n) and (a.toNode.uid == u) ) n_to_u_flow_sum = sum(a.datum.flow for a in G.setOfArcs if (a.fromNode.uid == n) and (a.toNode.uid == u) ) if(n_to_u_cap_sum > n_to_u_flow_sum): flow = round(n_to_u_cap_sum - n_to_u_flow_sum, TOL) get['residual_arcs'][n][u] = Arc(n,u,ResidualDatum(flow, 'push')) if(n_to_u_flow_sum > 0.0): flow = round(n_to_u_flow_sum, TOL) get['residual_arcs'][u][n] = Arc(u,n,ResidualDatum(flow, 'pull')) get['node_to_node_capacity'][n][u] = n_to_u_cap_sum get['node_to_node_flow'][n][u] = n_to_u_flow_sum return get def fast_bfs(sid, tid, get): parent_of = dict([]) visited = frozenset([]) deq = coll.deque([sid]) while len(deq) > 0: n = deq.popleft() if n == tid: break for u in get['residual_arcs'][n]: if (u not in visited): visited = visited.union(frozenset({u})) parent_of[u] = get['residual_arcs'][n][u] deq.extend([u]) path = list([]) curr = tid while(curr != sid): if (curr not in parent_of): err_msg = 'No augmenting path from {} to {}.'.format(sid, curr) raise StopIteration(err_msg) path.extend([parent_of[curr]]) curr = parent_of[curr].fromNode path.reverse() return path def fast_edmonds_karp(mfps): sid, tid = mfps.sourceNodeUid, mfps.terminalNodeUid get = fast_e_k_preprocess(mfps.G) no_more_paths, loop_count = False, 0 while(not no_more_paths): try: apath = fast_bfs(sid, tid, get) get = fast_augment(apath, get) loop_count += 1 except StopIteration as e: no_more_paths = True nodes = frozenset(get['nodes'].values()) arcs = frozenset({}) for from_node in get['arcs']: for to_node in get['arcs'][from_node]: for arc in get['arcs'][from_node][to_node]: arcs |= frozenset({get['arcs'][from_node][to_node][arc]}) G = DiGraph(nodes, arcs) mfps = MaxFlowProblemState(G, sourceNodeUid=sid, terminalNodeUid=tid, maxFlowProblemStateUid=mfps.maxFlowProblemStateUid) return mfps
这是该算法如何从上面解决示例流网络的可视化。 可视化显示步骤,因为它们反映在代表最新流网络的有向图G
中,并且它们反映在该流网络的残差图中。 残差图中的增广路径显示为红色路径,表示受给定增广路径影响的节点和弧集的问题的有向图以绿色突出显示。 在每种情况下,我都会突出显示图表中将要更改的部分(红色或绿色),然后在更改后显示图表(仅以黑色显示)。
这是该算法如何解决不同示例流网络的另一个可视化。 请注意,这个使用实数并包含多个具有相同fromNode
和toNode
值的弧。
**另请注意,由于带有“拉”残差数据的弧可能是增广路径的一部分,因此在流动网络的有向图中受影响的节点_可能不在G!
.
二分图
假设我们有一个有向图G
,如果可以将G.setOfNodes
中的节点划分为两个集合( part_1
和part_2
),则G
是二分的,这样对于part_1
中的任何弧a
, G.setOfArcs
中的a.fromNode
和a.toNode
在part_1
中。 a.fromNode
中的part_2
和a.toNode
中的part_2
也不是真的。
换句话说,如果 G 可以划分为两组节点,则G
是二分的,使得每条弧都必须将一组中的节点连接到另一组中的节点。
测试二分法
假设我们有一个有向图G
,我们想测试它是否是二分的。 我们可以在O(|G.setOfNodes|+|G.setOfArcs|)
中通过贪婪地将图形着色为两种颜色来做到这一点。
首先,我们需要生成一个新的有向图H
。 该图将具有与G
相同的节点集,但它将具有比G
更多的弧。 G
中的每个弧a
将在H
中创建 2 个弧; 第一条弧与a
相同,第二条弧反转a
的导向器( b = Arc(a.toNode,a.fromNode,a.datum)
)。
Bipartition = coll.namedtuple('Bipartition',['firstPart', 'secondPart', 'G']) def bipartition(G): nodes = frozenset(G.setOfNodes arcs = frozenset(G.setOfArcs) arcs = arcs.union( frozenset( {Arc(a.toNode,a.fromNode,a.datum) for a in G.setOfArcs} ) ) H = DiGraph(nodes, arcs) H_as_dict = digraph_to_dict(H) color = dict([]) some_node = next(iter(H.setOfNodes)) deq = coll.deque([some_node]) color[some_node] = -1 while len(deq) > 0: curr = deq.popleft() for a in H_as_dict.get(curr): if (a.toNode not in color): color[a.toNode] = -1*color[curr] deq.extend([a.toNode]) elif(color[curr] == color[a.toNode]): print(curr,a.toNode) raise Exception('Not Bipartite.') firstPart = frozenset( {n for n in color if color[n] == -1 } ) secondPart = frozenset( {n for n in color if color[n] == 1 } ) if( firstPart.union(secondPart) != G.setOfNodes ): raise Exception('Not Bipartite.') return Bipartition(firstPart, secondPart, G)
匹配和最大匹配
假设我们有一个有向图G
并且matching
是来自G.setOfArcs
的弧的子集。 matching
是匹配中任意两个弧a
和b
的matching
: len(frozenset( {a.fromNode} ).union( {a.toNode} ).union( {b.fromNode} ).union( {b.toNode} )) == 4
。 换句话说,匹配中没有两条弧共享一个节点。
如果G
中没有其他匹配的alt_matching
使得len(matching) < len(alt_matching)
,则匹配matching
是最大匹配。 换句话说, matching
是最大匹配,如果它是来自G.setOfArcs
的最大弧集,仍然满足匹配的定义(添加任何不在匹配中的弧都会破坏匹配定义)。
如果 G.setOfArcs 中的每个节点n
在G.setOfArcs
a.fromNode == n or a.toNode == n
的matching
中存在一个弧a
,则最大匹配matching
是完美匹配。
最大二分匹配
最大二分匹配是二分的有向图G
上的最大匹配。
给定G
是二分的,寻找最大二分匹配的问题可以转化为可用Edmonds-Karp算法求解的最大流量问题,然后最大二分匹配可以从最大流量问题的解中恢复。
让bipartition
是G
的一个二分。
为此,我需要生成一个新的有向图( H
),其中包含一些新节点( H.setOfNodes
)和一些新弧( H.setOfArcs
)。 H.setOfNodes
包含G.setOfNodes
中的所有节点以及另外两个节点s
(源节点)和t
(终端节点)。
H.setOfArcs
将为每个G.setOfArcs
包含一个弧。 如果弧a
在G.setOfArcs
中, a.fromNode
在bipartition.firstPart
中, a.toNode
在bipartition.secondPart
中,则在H
中包含a
(添加FlowArcDatum(1,0)
)。
如果a.fromNode
在bipartition.secondPart
中并且a.toNode
在bipartition.firstPart
中,则在 H.setOfArcs 中包含 Arc(a.toNode,a.fromNode, H.setOfArcs
Arc(a.toNode,a.fromNode,FlowArcDatum(1,0))
。
二分图的定义确保没有弧连接两个节点位于同一分区中的任何节点。 H.setOfArcs
还包含从节点s
到bipartition.firstPart
中每个节点的弧。 最后, H.setOfArcs
包含bipartition.secondPart
中的每个节点到节点t
的弧。 对于 H.setOfArcs 中a
所有H.setOfArcs
a.datum.capacity = 1
。
首先将G.setOfNodes
中的节点划分为两个不相交的集合( part1
和part2
),这样G.setOfArcs
中的任何弧都不会从一个集合指向同一个集合(此分区是可能的,因为G
是二分的)。 接下来,将G.setOfArcs
中从part1
指向part2
的所有弧添加到H.setOfArcs
中。 然后创建单个源节点s
和单个终端节点t
并创建更多弧
然后,构造一个maxFlowProblemState
。
def solve_mbm( bipartition ): s = Node(uuid.uuid4(), FlowNodeDatum(0,0)) t = Node(uuid.uuid4(), FlowNodeDatum(0,0)) translate = {} arcs = frozenset([]) for a in bipartition.G.setOfArcs: if ( (a.fromNode in bipartition.firstPart) and (a.toNode in bipartition.secondPart) ): fark = Arc(a.fromNode,a.toNode,FlowArcDatum(1,0)) arcs = arcs.union({fark}) translate[frozenset({a.fromNode.uid,a.toNode.uid})] = a elif ( (a.toNode in bipartition.firstPart) and (a.fromNode in bipartition.secondPart) ): bark = Arc(a.toNode,a.fromNode,FlowArcDatum(1,0)) arcs = arcs.union({bark}) translate[frozenset({a.fromNode.uid,a.toNode.uid})] = a arcs1 = frozenset( {Arc(s,n,FlowArcDatum(1,0)) for n in bipartition.firstPart } ) arcs2 = frozenset( {Arc(n,t,FlowArcDatum(1,0)) for n in bipartition.secondPart } ) arcs = arcs.union(arcs1.union(arcs2)) nodes = frozenset( {Node(n.uid,FlowNodeDatum(0,0)) for n in bipartition.G.setOfNodes} ).union({s}).union({t}) G = DiGraph(nodes, arcs) mfp = MaxFlowProblemState(G, s.uid, t.uid, 'bipartite') result = edmonds_karp(mfp) lookup_set = {a for a in result.G.setOfArcs if (a.datum.flow > 0) and (a.fromNode.uid != s.uid) and (a.toNode.uid != t.uid)} matching = {translate[frozenset({a.fromNode.uid,a.toNode.uid})] for a in lookup_set} return matching
最小节点覆盖
有向图G
中的节点覆盖是来自G.setOfNodes
的一组节点( cover
),因此对于G.setOfArcs
的任何弧a
,这必须为真: (a.fromNode in cover) or (a.toNode in cover)
。
最小节点覆盖是图中仍然是节点覆盖的最小可能节点集。 Konig 定理指出,在二分图中,该图上最大匹配的大小等于最小节点覆盖的大小,它表明节点覆盖如何从最大匹配中恢复:
假设我们有bipartition bipartition
和最大匹配matching
。 定义一个新的有向图H
, H.setOfNodes=G.setOfNodes
, H.setOfArcs
中的弧是两个集合的并集。
第一组是matching
的弧a
,如果a.fromNode in bipartition.firstPart
的 a.fromNode 和a.toNode in bipartition.secondPart
中的 a.toNode 则在创建的弧中交换a.fromNode
和a.toNode
给这样的弧a.datum.inMatching=True
属性表示它们是从匹配的弧中派生的。
第二组是弧a
NOT in matching
,如果a.fromNode in bipartition.secondPart
和a.toNode in bipartition.firstPart
则a.fromNode
和a.toNode
在创建的弧中交换(给这样的弧a a.datum.inMatching=False
属性)。
接下来,从bipartition.firstPart
中的每个节点n
开始运行深度优先搜索(DFS),对于matching
的任何弧a
,它既不是n == a.fromNode
也不是n == a.toNodes
。 在 DFS 期间,访问了一些节点,而一些没有访问(将此信息存储在n.datum.visited
字段中)。 最小节点覆盖是节点{a.fromNode for a in H.setOfArcs if ( (a.datum.inMatching) and (a.fromNode.datum.visited) ) }
和节点{a.fromNode for a in H.setOfArcs if (a.datum.inMatching) and (not a.toNode.datum.visited)}
。
这可以通过矛盾证明从最大匹配引导到最小节点覆盖,取一些本应未被覆盖的弧a
并考虑关于a.fromNode
和a.toNode
是否属于的所有四种情况(无论是toNode
还是fromNode
) 到匹配matching
中的任何弧。 由于DFS访问节点的顺序以及matching
是最大匹配的事实,每种情况都会导致矛盾。
假设我们有一个函数来执行这些步骤,并在给定有向图G
和最大匹配matching
时返回包含最小节点覆盖的节点集:
ArcMatchingDatum = coll.namedtuple('ArcMatchingDatum', ['inMatching' ]) NodeMatchingDatum = coll.namedtuple('NodeMatchingDatum', ['visited']) def dfs_helper(snodes, G): sniter, do_stop = iter(snodes), False visited, visited_uids = set(), set() while(not do_stop): try: stack = [ next(sniter) ] while len(stack) > 0: curr = stack.pop() if curr.uid not in visited_uids: visited = visited.union( frozenset( {Node(curr.uid, NodeMatchingDatum(False))} ) ) visited_uids = visited.union(frozenset({curr.uid})) succ = frozenset({a.toNode for a in G.setOfArcs if a.fromNode == curr}) stack.extend( succ.difference( frozenset(visited) ) ) except StopIteration as e: stack, do_stop = [], True return visited def get_min_node_cover(matching, bipartition): nodes = frozenset( { Node(n.uid, NodeMatchingDatum(False)) for n in bipartition.G.setOfNodes} ) G = DiGraph(nodes, None) charcs = frozenset( {a for a in matching if ( (a.fromNode in bipartition.firstPart) and (a.toNode in bipartition.secondPart) )} ) arcs0 = frozenset( { Arc(find_node_by_uid(a.toNode.uid,G), find_node_by_uid(a.fromNode.uid,G), ArcMatchingDatum(True) ) for a in charcs } ) arcs1 = frozenset( { Arc(find_node_by_uid(a.fromNode.uid,G), find_node_by_uid(a.toNode.uid,G), ArcMatchingDatum(True) ) for a in matching.difference(charcs) } ) not_matching = bipartition.G.setOfArcs.difference( matching ) charcs = frozenset( {a for a in not_matching if ( (a.fromNode in bipartition.secondPart) and (a.toNode in bipartition.firstPart) )} ) arcs2 = frozenset( { Arc(find_node_by_uid(a.toNode.uid,G), find_node_by_uid(a.fromNode.uid,G), ArcMatchingDatum(False) ) for a in charcs } ) arcs3 = frozenset( { Arc(find_node_by_uid(a.fromNode.uid,G), find_node_by_uid(a.toNode.uid,G), ArcMatchingDatum(False) ) for a in not_matching.difference(charcs) } ) arcs = arcs0.union(arcs1.union(arcs2.union(arcs3))) G = DiGraph(nodes, arcs) bip = Bipartition({find_node_by_uid(n.uid,G) for n in bipartition.firstPart},{find_node_by_uid(n.uid,G) for n in bipartition.secondPart},G) match_from_nodes = frozenset({find_node_by_uid(a.fromNode.uid,G) for a in matching}) match_to_nodes = frozenset({find_node_by_uid(a.toNode.uid,G) for a in matching}) snodes = bip.firstPart.difference(match_from_nodes).difference(match_to_nodes) visited_nodes = dfs_helper(snodes, bip.G) not_visited_nodes = bip.G.setOfNodes.difference(visited_nodes) H = DiGraph(visited_nodes.union(not_visited_nodes), arcs) cover1 = frozenset( {a.fromNode for a in H.setOfArcs if ( (a.datum.inMatching) and (a.fromNode.datum.visited) ) } ) cover2 = frozenset( {a.fromNode for a in H.setOfArcs if ( (a.datum.inMatching) and (not a.toNode.datum.visited) ) } ) min_cover_nodes = cover1.union(cover2) true_min_cover_nodes = frozenset({find_node_by_uid(n.uid, bipartition.G) for n in min_cover_nodes}) return min_cover_nodes
线性分配问题
线性分配问题包括在加权二分图中找到最大权重匹配。
像本文开头的问题可以表示为线性分配问题。 给定一组工人、一组任务和一个函数,该函数指示将一名工人分配给一项任务的盈利能力,我们希望最大化我们所做的所有分配的总和; 这是一个线性分配问题。
假设任务和工人的数量是相等的,尽管我会证明这个假设很容易消除。 在实现中,我用属性a.datum.weight
为弧a
表示弧权重。
WeightArcDatum = namedtuple('WeightArcDatum', [weight])
Kuhn-Munkres 算法
Kuhn-Munkres 算法解决了线性分配问题。 一个好的实现可能需要O(N^{4})
时间,(其中N
是表示问题的有向图中的节点数)。 一个更容易解释的实现需要O(N^{5})
(用于重新生成DiGraphs的版本)和O(N^{4})
用于(用于维护DiGraphs的版本)。 这类似于Edmonds-Karp算法的两种不同实现。
对于此描述,我只使用完整的二分图(其中一半节点位于二分之一部分而另一半位于第二部分的那些)。 在工人中,任务动机意味着有与任务一样多的工人。
这似乎是一个重要的条件(如果这些集合不相等怎么办!)但很容易解决这个问题; 我在最后一节中讨论了如何做到这一点。
此处描述的算法版本使用了有用的零权重弧概念。 不幸的是,这个概念只有在我们解决最小化问题时才有意义(如果我们不是最大化我们的工人任务分配的利润,而是最小化此类分配的成本)。
幸运的是,通过将每个弧a
权重设置为Ma.datum.weight
,很容易将最大线性分配问题转化为最小线性分配问题,其中M=max({a.datum.weight for a in G.setOfArcs})
. 原始最大化问题的解决方案将与弧权重更改后的解决方案最小化问题相同。 因此,对于其余部分,假设我们进行了此更改。
Kuhn-Munkres 算法通过未加权二分图中的一系列最大匹配来解决加权二分图中的最小权重匹配。 如果我们在线性分配问题的有向图表示上找到一个完美匹配,并且如果匹配中每条弧的权重为零,那么我们找到了最小权重匹配,因为这个匹配表明有向图中的所有节点都已经由具有最低可能成本的弧匹配(根据先前的定义,没有成本可以低于 0)。
不能将其他弧添加到匹配中(因为所有节点都已匹配)并且不应从匹配中删除任何弧,因为任何可能的替换弧将至少具有相同的权重值。
如果我们找到G
的子图的最大匹配,它只包含零权重弧,并且它不是完美匹配,我们没有完整的解决方案(因为匹配不完美)。 但是,我们可以通过改变G.setOfArcs
中弧的权重来生成一个新的有向图H
,使得新的 0 权重弧出现,并且H
的最优解与G
的最优解相同。 由于我们保证在每次迭代中至少产生一个零权重弧,我们保证我们将在不超过|G.setOfNodes|^{2}=N^{2}次这样的迭代中达到完美匹配。
假设在bipartition bipartition
中, bipartition.firstPart
包含代表worker 的节点, bipartition.secondPart
代表代表task 的节点。
该算法首先生成一个新的有向图H
。 H.setOfNodes = G.setOfNodes
。 H
中的一些弧是从bipartition.firstPart
中的节点n
生成的。 每个这样的节点n
为bipartition.G.setOfArcs
中的每个弧a
在H.setOfArcs
中生成一个弧b
其中a.fromNode = n
或a.toNode = n
, b=Arc(a.fromNode, a.toNode, a.datum.weight - z)
其中z=min(x.datum.weight for x in G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) ))
。
H
中的更多弧是从bipartition.secondPart
中的节点n
生成的。 每个这样的节点n
为bipartition.G.setOfArcs
中的每个弧a
在H.setOfArcs
中生成一个弧b
其中a.fromNode = n
或a.toNode = n
, b=Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weight - z))
其中z=min(x.datum.weight for x in G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) ))
。
KMA:接下来,形成一个新的有向图K
,它仅由H
的零权重弧和入射在这些弧上的节点组成。 在K
中的节点上形成bipartition
,然后使用solve_mbm( bipartition )
得到K
上的最大匹配( matching
)。 如果matching
是H
中的完美匹配( matching
中的弧都发生在H.setOfNodes
中的所有节点上),则matching
是线性分配问题的最优解。
否则,如果matching
不完美,则使用node_cover = get_min_node_cover(matching, bipartition(K))
生成K
的最小节点覆盖。 接下来,定义z=min({a.datum.weight for a in H.setOfArcs if a not in node_cover})
。 为 H.setOfArcs 中的 a 定义nodes = H.setOfNodes
, arcs1 = {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh-z)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) and (a.toNode not in node_cover)}
, arcs2 = {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) != (a.toNode not in node_cover)}
, arcs3 = {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh+z)) for a in H.setOfArcs if ( (a.fromNode in node_cover) and (a.toNode in node_cover)}
. 上一个表达式中的!=
符号充当 XOR 运算符。然后arcs = arcs1.union(arcs2.union(arcs3))
。接下来, H=DiGraph(nodes,arcs)
。返回标签KMA 。算法一直持续到产生完美匹配为止。这种匹配也是线性分配问题的解决方案。
def kuhn_munkres( bipartition ): nodes = bipartition.G.setOfNodes arcs = frozenset({}) for n in bipartition.firstPart: z = min( {x.datum.weight for x in bipartition.G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) )} ) arcs = arcs.union( frozenset({Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weight - z)) }) ) for n in bipartition.secondPart: z = min( {x.datum.weight for x in bipartition.G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) )} ) arcs = arcs.union( frozenset({Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weight - z)) }) ) H = DiGraph(nodes, arcs) assignment, value = dict({}), 0 not_done = True while( not_done ): zwarcs = frozenset( {a for a in H.setOfArcs if a.datum.weight == 0} ) znodes = frozenset( {n.fromNode for n in zwarcs} ).union( frozenset( {n.toNode for n in zwarcs} ) ) K = DiGraph(znodes, zwarcs) k_bipartition = bipartition(K) matching = solve_mbm( k_bipartition ) mnodes = frozenset({a.fromNode for a in matching}).union(frozenset({a.toNode for a in matching})) if( len(mnodes) == len(H.setOfNodes) ): for a in matching: assignment[ a.fromNode.uid ] = a.toNode.uid value = sum({a.datum.weight for a in matching}) not_done = False else: node_cover = get_min_node_cover(matching, bipartition(K)) z = min( frozenset( {a.datum.weight for a in H.setOfArcs if a not in node_cover} ) ) nodes = H.setOfNodes arcs1 = frozenset( {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh-z)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) and (a.toNode not in node_cover)} ) arcs2 = frozenset( {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) != (a.toNode not in node_cover)} ) arcs3 = frozenset( {Arc(a.fromNode,a.toNode,ArcWeightDatum(a.datum.weigh+z)) for a in H.setOfArcs if ( (a.fromNode in node_cover) and (a.toNode in node_cover)} ) arcs = arcs1.union(arcs2.union(arcs3)) H = DiGraph(nodes,arcs) return value, assignment
这个实现是O(N^{5})
,因为它在每次迭代中生成一个新的最大匹配matching
; 与Edmonds-Karp的前两个实现类似,可以修改此算法,以便跟踪匹配并智能地使其适应每次迭代。 完成后,复杂度变为O(N^{4})
。 该算法的更高级和更新版本(需要一些更高级的数据结构)可以在O(N^{3})
中运行。 上面更简单的实现和更高级的实现的细节可以在这篇博文中找到。
弧权重上的任何操作都不会修改算法返回的最终分配。 原因如下:由于我们的输入图始终是完整的二分图,因此解决方案必须通过这两个节点之间的弧将一个分区中的每个节点映射到第二个分区中的另一个节点。 请注意,对弧权重执行的操作永远不会改变在任何特定节点上发生的弧的顺序(按权重排序)。
因此,当算法在完美的完全二分匹配处终止时,每个节点都被分配一个零权重弧,因为在算法期间来自该节点的弧的相对顺序没有改变,并且由于零权重弧是最便宜的可能弧,并且完美的完全二分匹配保证每个节点都存在一个这样的弧。 这意味着生成的解决方案确实与原始线性分配问题的解决方案相同,而无需修改弧权重。
不平衡的分配
看起来该算法非常有限,因为如上所述,它仅在完整的二分图上运行(其中一半节点位于二分之一部分,另一半位于第二部分)。 在工人中,任务动机意味着有与任务一样多的工人(似乎非常有限)。
但是,有一个简单的转换可以消除此限制。 假设工人比任务少,我们添加一些虚拟工人(足以使结果图成为完整的二分图)。 Each dummy worker has an arc directed from the worker to each of the tasks. Each such arc has weight 0 (placing it in a matching gives no added profit). After this change the graph is a complete bipartite graph which we can solve for. Any task assigned a dummy worker is not initiated.
Suppose that there are more tasks than workers. We add some dummy tasks (enough to make the resulting graph a complete bipartite graph ). Each dummy task has an arc directed from each worker to the dummy task. Each such arc has a weight of 0 (placing it in a matching gives no added profit). After this change the graph is a complete bipartite graph which we can solve for. Any worker assigned to dummy task is not employed during the period.
A Linear Assignment Example
Finally, let's do an example with the code I've been using. I'm going to modify the example problem from here. We have 3 tasks: we need to clean the bathroom , sweep the floor , and wash the windows .
The workers available to use are Alice , Bob , Charlie , and Diane . Each of the workers gives us the wage they require per task. Here are the wages per worker:
Clean the Bathroom | Sweep the Floor | Wash the Windows | |
---|---|---|---|
爱丽丝 | 2美元 | 3 美元 | 3 美元 |
鲍勃 | 3 美元 | 2美元 | 3 美元 |
查理 | 3 美元 | 3 美元 | 2美元 |
黛安 | 9 美元 | 9 美元 | 1美元 |
If we want to pay the least amount of money, but still get all the tasks done, who should do what task? Start by introducing a dummy task to make the digraph representing the problem bipartite.
Clean the Bathroom | Sweep the Floor | Wash the Windows | 没做什么 | |
---|---|---|---|---|
爱丽丝 | 2美元 | 3 美元 | 3 美元 | $0 |
鲍勃 | 3 美元 | 2美元 | 3 美元 | $0 |
查理 | 3 美元 | 3 美元 | 2美元 | $0 |
黛安 | 9 美元 | 9 美元 | 1美元 | $0 |
Supposing that the problem is encoded in a digraph , then kuhn_munkres( bipartition(G) )
will solve the problem and return the assignment. It's easy to verify that the optimal (lowest cost) assignment will cost $5.
Here's a visualization of the solution the code above generates:
这就对了。 You now know everything you need to know about the linear assignment problem.
You can find all of the code from this article on GitHub.
进一步阅读 Toptal 工程博客:
- 使用 Python/NetworkX 进行图形数据科学