个性化阅读
专注于IT技术分析

最大流量和线性分配问题

本文概述

这是一个问题:你的企业指派承包商来履行合同。你查看你的花名册, 并确定可以聘用一个月的承包商, 然后查看可用的合同, 以查看其中哪些合同可以进行一个月的工作。既然你知道每个承包商可以有效地履行每个合同, 那么你如何分配承包商以最大化该月的整体效率?

这是分配问题的一个示例, 可以使用经典的匈牙利算法来解决该问题。

双向匹配

匈牙利算法(也称为Kuhn-Munkres算法)是一种多项式时间算法, 可使加权二部图中的权重匹配最大化。在这里, 承包商和合同可以建模为二部图, 其有效性作为承包商和合同节点之间的边缘权重。

在本文中, 你将学习使用Edmonds-Karp算法解决线性分配问题的匈牙利算法的实现。你还将学习Edmonds-Karp算法如何对Ford-Fulkerson方法进行轻微修改, 以及这种修改的重要性。

最大流量问题

最大流量问题本身可以非正式地描述为通过管道网络将某些流体或气体从单一来源移动到单一终端的问题。假设网络中的压力足以确保流体或气体不会在任何长度的管子或管件(在不同长度的管子汇合的地方)中滞留, 就可以做到这一点。

通过对图形进行某些更改, 分配问题可以转化为最大流量问题。

初赛

解决这些问题所需的想法出现在许多数学和工程学科中, 通常类似的概念以不同的名称已知并且以不同的方式表示(例如, 邻接矩阵和邻接列表)。由于这些想法非常深奥, 因此选择了在任何给定的环境中定义这些概念的方式。

本文将不带任何介绍性的集合论知识, 而不会假设任何先验知识。

本文中的实现将问题表示为有向图(图)。

有向图具有两个属性, setOfNodes和setOfArcs。这两个属性都是集合(无序集合)。在这篇文章的代码块中, 我实际上是在使用Python的Frozenset, 但这一细节并不是特别重要。

DiGraph = namedtuple('DiGraph', ['setOfNodes', 'setOfArcs'])

(注意:本文以及本文中的所有其他代码都是使用Python 3.6编写的。)

节点数

节点n由两个属性组成:

  • n.uid:唯一标识符。

    这意味着对于任意两个节点x和y,

x.uid != y.uid
  • n.datum:代表任何数据对象。
Node = namedtuple('Node', ['uid', 'datum'])

弧线

弧a由三个属性组成:

  • a.fromNode:这是一个如上定义的节点。

  • a.toNode:这是一个如上定义的节点。

  • a.datum:代表任何数据对象。

Arc = namedtuple('Arc', ['fromNode', 'toNode', 'datum'])

有向图中的一组弧表示有向图中的节点上的二进制关系。 arc a的存在意味着a.fromNode和a.toNode之间存在关系。

在有向图(与无向图相对)中, a.fromNode和a.toNode之间存在关系并不意味着a.toNode和a.fromNode之间存在相似关系。

这是因为在无向图中, 表达的关系不一定是对称的。

可以使用节点和弧来定义有向图, 但是为方便起见, 在以下算法中, 将有向图用字典表示。

这是一种可以将上面的图形表示转换成类似于此的字典表示的方法:

def digraph_to_dict(G):
    G_as_dict = dict([])
    for a in G.setOfArcs:
        if(a.fromNode not in G.setOfNodes):
            err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals())
            pdg(G)
            raise KeyError(err_msg)
        if(a.toNode not in G.setOfNodes):
            err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals())
            pdg(G)
            raise KeyError(err_msg)
        G_as_dict[a.fromNode] = (G_as_dict[a.fromNode].union(frozenset([a]))) if (a.fromNode in G_as_dict) else frozenset([a])
    for a in G.setOfArcs:
        if(a.fromNode not in G.setOfNodes):
            err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals())
            raise KeyError(err_msg)
        if a.toNode not in G_as_dict:
            G_as_dict[a.toNode] = frozenset([])
    return G_as_dict

这是另一个将其转换成字典的字典的方法, 它将非常有用:

def digraph_to_double_dict(G):
    G_as_dict = dict([])
    for a in G.setOfArcs:
        if(a.fromNode not in G.setOfNodes):
            err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals())
            raise KeyError(err_msg)
        if(a.toNode not in G.setOfNodes):
            err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals())
            raise KeyError(err_msg)
        if(a.fromNode not in G_as_dict):
             G_as_dict[a.fromNode] = dict({a.toNode : frozenset([a])})
        else:
            if(a.toNode not in G_as_dict[a.fromNode]):
                G_as_dict[a.fromNode][a.toNode] = frozenset([a])
            else:
                G_as_dict[a.fromNode][a.toNode] = G_as_dict[a.fromNode][a.toNode].union(frozenset([a]))

    for a in G.setOfArcs:
        if(a.fromNode not in G.setOfNodes):
            err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals())
            raise KeyError(err_msg)
        if a.toNode not in G_as_dict:
            G_as_dict[a.toNode] = dict({})
    return G_as_dict

当文章讨论由字典表示的有向图时, 将提到G_as_dict来引用它。

有时候, 通过图G的uid(唯一标识符)从图G中获取节点会很有帮助:

def find_node_by_uid(find_uid, G):
    nodes = {n for n in G.setOfNodes if n.uid == find_uid}
    if(len(nodes) != 1):
        err_msg = 'Node with uid {find_uid!s} is not unique.'.format(**locals())
        raise KeyError(err_msg)
    return nodes.pop()

在定义图形时, 人们有时会使用术语”节点”和”顶点”来指代相同的概念。弧和边也是如此。

Python中有两种流行的图形表示形式, 一种使用字典, 另一种使用对象表示图形。这篇文章中的表示形式介于这两种常用表示形式之间。

这是我的有向图。有很多类似的东西, 但这是我的。

步道

令S_Arcs是有向图G中弧的有限序列(有序集合), 使得如果a_除了最后一个弧在S_Arcs中是任何弧, 并且b在序列中跟随a, 则必须有一个节点n = a.fromNode在G.setOfNodes, 这样a.toNode = b.fromNode。

从S_Arcs中的第一个弧开始, 一直持续到S_Arcs中的最后一个弧, 收集(按照遇到的顺序)S_Arcs中每两个连续弧之间的所有节点n(如上定义)。标记在此操作S_ {Nodes}中收集的节点的有序集合。

def path_arcs_to_nodes(s_arcs):
    s_nodes = list([])
    arc_it = iter(s_arcs)
    step_count = 0
    last = None
    try:
        at_end = False
        last = a1 = next(arc_it)
        while (not at_end):
            s_nodes += [a1.fromNode]
            last = a2 = next(arc_it)
            step_count += 1
            if(a1.toNode != a2.fromNode):
                err_msg = "Error at index {step_count!s} of Arc sequence.".format(**locals())
                raise ValueError(err_msg)
            a1 = a2
    except StopIteration as e:
        at_end = True

    if(last is not None):
        s_nodes += [last.toNode]

    return s_nodes
  • 如果某个节点在序列S_Nodes中出现多次, 则称S_Arcs为有向图G上的走动。

  • 否则, 在图G上调用S_Arcs从list(S_Nodes)[0]到list(S_Nodes)[-1]的路径。

源节点

如果s在G.setOfNodes中, 并且G.setOfArcs不包含使a.toNode = s的弧, 则调用节点s是有向图G中的源节点。

def source_nodes(G):
    to_nodes = frozenset({a.toNode for a in G.setOfArcs})
    sources = G.setOfNodes.difference(to_nodes)
    return sources

终端节点

如果t在G.setOfNodes中并且G.setOfArcs不包含弧a, 则将节点t称为图G中的终端节点, 这样a.fromNode = t。

def terminal_nodes(G):
    from_nodes = frozenset({a.fromNode for a in G.setOfArcs})
    terminals = G.setOfNodes.difference(from_nodes)
    return terminals

割伤和s-t割伤

连通图G的割线是G.setOfArcs的弧的子集, 它分割G中的节点G.setOfNodes的集合。如果G.setOfNodes中的每个节点n都连接并且G中至少有一个弧a, 则G连接。 setOfArcs, 使得n = a.fromNode或n = a.toNode, 但a.fromNode!= a.toNode。

Cut = namedtuple('Cut', ['setOfCutArcs'])

上面的定义是指弧的子集, 但它也可以定义G.setOfNodes节点的分区。

对于函数predecessors_of和successors_of, n是有向图G的集合G.setOfNodes中的一个节点, 而cut是G的一个割:

def cut_predecessors_of(n, cut, G):
    allowed_arcs   = G.setOfArcs.difference(frozenset(cut.setOfCutArcs))
    predecessors   = frozenset({})
    previous_count = len(predecessors)
    reach_fringe   = frozenset({n})
    keep_going     = True
    while( keep_going ):
        reachable_from = frozenset({a.fromNode for a in allowed_arcs if (a.toNode in reach_fringe)})
        reach_fringe   = reachable_from
        predecessors   = predecessors.union(reach_fringe)
        current_count  = len(predecessors)
        keep_going     = current_count -  previous_count > 0
        previous_count = current_count
    return predecessors

def cut_successors_of(n, cut, G):
    allowed_arcs   = G.setOfArcs.difference(frozenset(cut.setOfCutArcs))
    successors     = frozenset({})
    previous_count = len(successors)
    reach_fringe   = frozenset({n})
    keep_going     = True
    while( keep_going ):
        reachable_from = frozenset({a.toNode for a in allowed_arcs if (a.fromNode in reach_fringe)})
        reach_fringe   = reachable_from
        successors     = successors.union(reach_fringe)
        current_count  = len(successors)
        keep_going     = current_count -  previous_count > 0
        previous_count = current_count
    return successors

让割为有向图G的割。

def get_first_part(cut, G):
    firstPartFringe = frozenset({a.fromNode for a in cut.setOfCutArcs})
    firstPart = firstPartFringe
    for n in firstPart:
        preds = cut_predecessors_of(n, cut, G)
        firstPart = firstPart.union(preds)
    return firstPart

def get_second_part(cut, G):
    secondPartFringe = frozenset({a.toNode for a in cut.setOfCutArcs})
    secondPart = secondPartFringe
    for n in secondPart:
        succs= cut_successors_of(n, cut, G)
        secondPart = secondPart.union(succs)
    return secondPart

如果满足以下条件, 则cut是有向图G的切割:(get_first_part(cut, G).union(get_second_part(cut, G))== G.setOfNodes)和(len(get_first_part(cut, G).intersect(get_second_part(cut, G)。 )))== 0)如果((get_first_part(cut, G)中的x和get_second_part(cut, G)中的y)和(x!= y), 则cut称为xy切割。当x-y切割中的节点x是源节点并且x-y切割中的节点y是终端节点时, 则此切割称为s-t切割。

STCut = namedtuple('STCut', ['s', 't', 'cut'])

流网络

你可以使用图G来表示流网络。

为每个节点分配n, 其中n在G.setOfNodes中为一个FlowNodeDatum的n.datum:

FlowNodeDatum = namedtuple('FlowNodeDatum', ['flowIn', 'flowOut'])

分配每个圆弧a, 其中a在G.setOfArcs中, 而a.datum是FlowArcDatum。

FlowArcDatum = namedtuple('FlowArcDatum', ['capacity', 'flow'])

flowNodeDatum.flowIn和flowNodeDatum.flowOut是正实数。

flowArcDatum.capacity和flowArcDatum.flow也是正实数。

对于G.setOfNodes中的每个节点节点n:

n.datum.flowIn = sum({a.datum.flow for a in G.Arcs if a.toNode == n})
n.datum.flowOut = sum({a.datum.flow for a in G.Arcs if a.fromNode == n})

图G现在代表一个流动网络。

G的流是指G中所有弧a的a。流。

可行流量

让图G代表一个流动网络。

在以下情况下, 以G表示的流动网络具有可行的流动:

  1. 对于G.setOfNodes中的每个节点, 除源节点和终端节点外, n:n.datum.flowIn = n.datum.flowOut。

  2. 对于G.setOfNodes中的每个弧, a.datum.capacity <= a.datum.flow。

条件1称为守恒约束。

条件2称为容量约束。

切割能力

由有向图G表示的流网络的源节点s和终端节点t的s-t cut stCut的剪切容量为:

def cut_capacity(stCut, G):
    part_1 = get_first_part(stCut.cut, G)
    part_2 = get_second_part(stCut.cut, G)
    s_part = part_1 if stCut.s in part_1 else part_2
    t_part = part_1 if stCut.t in part_1 else part_2
    cut_capacity = sum({a.datum.capacity for a in stCut.cut.setOfCutArcs if ( (a.fromNode in s_part) and (a.toNode in t_part) )})
    return cut_capacity

最小容量削减

令stCut = stCut(s, t, cut)是有向图G表示的流动网络的s-t切割。

如果在该流网络中没有其他s-t削减stCutCompetitor, 则stCut是用G表示的流网络的最小容量削减, 从而:

    cut_capacity(stCut, G) < cut_capacity(stCutCompetitor, G)

剥离流

我想提到的有向图是将有向图G并从G.setOfNodes中的所有节点以及G.setOfArcs中的所有弧中剥离所有流数据的结果。

def strip_flows(G):
    new_nodes= frozenset( (Node(n.uid, FlowNodeDatum(0.0, 0.0)) for n in G.setOfNodes) )
    new_arcs = frozenset([])
    for a in G.setOfArcs:
        new_fromNode = Node(a.fromNode.uid, FlowNodeDatum(0.0, 0.0))
        new_toNode     = Node(a.toNode.uid, FlowNodeDatum(0.0, 0.0))
        new_arc            = Arc(new_fromNode, new_toNode, FlowArcDatum(a.datum.capacity, 0.0))
        new_arcs          = new_arcs.union(frozenset([new_arc]))
    return DiGraph(new_nodes, new_arcs)

最大流量问题

在以下情况下, 表示为有向图G的流网络, G.setOfNodes中的源节点s和G.setOfNodes中的终端节点t可以表示最大流问题:

(len(list(source_nodes(G))) == 1) and (next(iter(source_nodes(G))) == s) and (len(list(terminal_nodes(G))) == 1) and (next(iter(terminal_nodes(G))) == t)

标记此表示形式:

MaxFlowProblemState = namedtuple('MaxFlowProblemState', ['G', 'sourceNodeUid', 'terminalNodeUid', 'maxFlowProblemStateUid'])

其中sourceNodeUid = s.uid, terminalNodeUid = t.uid, maxFlowProblemStateUid是问题实例的标识符。

最大流量解决方案

让maxFlowProblem代表最大流量问题。 maxFlowProblem的解决方案可以用表示为有向图H的流网络表示。

如果满足以下条件, 则图H是解决输入python maxFlowProblem上最大流量问题的可行解决方案:

  1. strip_flows(maxFlowProblem.G)== strip_flows(H)。

  2. H是一个流动网络, 具有可行的流动。

如果除了1和2:

  1. 有向图K表示的其他流网络不能使strip_flows(G)== strip_flows(K)和find_node_by_uid(t.uid, G).flowIn <find_node_by_uid(t.uid, K).flowIn。

那么H也是maxFlowProblem的最佳解决方案。

换句话说, 可用图表示可行的最大流量解:

  1. 与相应的最大流量问题的图G相同, 不同之处在于任何节点和弧的n.datum.flowIn, n.datum.flowOut和a.datum.flow可能不同。

  2. 表示具有可行流的流网络。

而且, 它还可以代表最佳的最大流量解决方案:

  1. 最大流量问题中与终端节点对应的节点的flowIn尽可能大(当条件1和2仍然满足时)。

如果图H表示可行的最大流量解:find_node_by_uid(s.uid, H).flowOut = find_node_by_uid(t.uid, H).flowIn从最大流量开始, 最小割定理(如下所述)。非正式地, 由于假定H具有可行的流, 这意味着在穿越任何(其他)节点时(保存约束), 既不能”创建”(除了在源节点s处), 也不能”销毁”(在终端节点t处除外)。

由于最大流量问题仅包含一个源节点s和一个终端节点t, 因此在s处”创建”的所有流量都必须在t处”销毁”, 否则流量网络不具有可行的流量(将违反保护约束条件) )。

令图H代表可行的最大流量解;上面的值称为H的s-t流量值。

让:

mfps=MaxFlowProblemState(H, maxFlowProblem.sourceNodeUid, maxFlowProblem.terminalNodeUid, maxFlowProblem.maxFlowProblemStateUid)

这意味着mfps是maxFlowProblem的后继状态, 这仅表示mfps与maxFlowProblem完全一样, 只是maxFlowProblem.setOfArcs中弧a的a.flow值与mfps中弧a的a.flow可能不同。 setOfArcs。

def get_mfps_flow(mfps):
    flow_from_s = find_node_by_uid(mfps.sourceNodeUid, mfps.G).datum.flowOut
    flow_to_t      = find_node_by_uid(mfps.terminalNodeUid, mfps.G).datum.flowIn

    if( (flow_to_t - flow_from_s) > 0):
        raise Exception('Infeasible s-t flow')
    return flow_to_t

这是mfps及其关联的maxFlowProb的可视化。图像中的每个弧a都有一个标签, 这些标签是a.datum.flowFrom / a.datum.flowTo, 图像中的每个节点n都有一个标签, 并且这些标签是n.uid {n.datum.flowIn / a .datum.flowOut}。

最大流量可视化

切割流量

令mfps代表MaxFlowProblemState, 让stCut代表mfps.G的片段。 stCut的剪切流定义为:

def get_stcut_flow(stCut, mfps):
    s = find_node_by_uid(mfps.sourceNodeUid, mfps.G)
    t = find_node_by_uid(mfps.terminalNodeUid, mfps.G)
    part_1 = get_first_part(stCut.cut, mfps.G)
    part_2 = get_second_part(stCut.cut, mfps.G)
    s_part = part_1 if s in part_1 else part_2
    t_part = part_1 if t in part_1 else part_2
    s_t_sum = sum(a.datum.flow for a in mfps.G.setOfArcs if (a in stCut.cut.setOfCutArcs) and (a.fromNode in s_part) )
    t_s_sum = sum(a.datum.flow for a in mfps.G.setOfArcs if (a in stCut.cut.setOfCutArcs) and (a.fromNode in t_part) )
    cut_flow = s_t_sum - t_s_sum
    return cut_flow

s-t cut flow是从包含源节点的分区到包含终端节点的分区的流量总和减去从包含终端节点的分区到包含源节点的分区的流量总和。

最大流量, 最小切割

令maxFlowProblem表示最大流量问题, 并用图H表示的流量网络表示对maxFlowProblem的解。

令minStCut为以maxFlowProblem.G表示的流动网络的最小容量削减。

因为在最大流问题中, 流仅起源于单个源节点并终止于单个终端节点, 并且由于容量约束和守恒约束, 我们知道进入maxFlowProblem.terminalNodeUid的所有流都必须跨越任何切入点, 尤其是必须跨越minStCut。这表示:

get_flow_value(H, maxFlowProblem) <= cut_capacity(minStCut, maxFlowProblem.G)

解决最大流量问题

解决最大流量问题maxFlowProblem的基本思想是从图H表示的最大流量解决方案开始。这样的起点可以是H = strip_flows(maxFlowProblem.G)。然后的任务是使用H, 并通过对H.setOfArcs中某些弧a的a.datum.flow值进行一些贪婪的修改, 以生成由有向图K表示的另一个最大流量解, 使得K仍无法表示具有可行流的流网络。和get_flow_value(H, maxFlowProblem)<get_flow_value(K, maxFlowProblem)。只要此过程继续进行, 最近遇到的最大流量解决方案(K)的质量(get_flow_value(K, maxFlowProblem))就会比发现的其他任何最大流量解决方案都要好。如果过程达到了无法进行其他改进的程度, 则过程可以终止, 并且将返回最佳的最大流量解决方案。

上面的描述是一般性的, 并跳过了许多证明, 例如是否可以进行此过程或需要花费多长时间, 我将提供更多详细信息和算法。

最大流量, 最小割定理

在Ford和Fulkerson的《网络中的流》一书中, 最大流, 最小割定理(定理5.1)的陈述为:

对于任何网络, 从s到t的最大流量值等于将s和t分开的所有切口的最小切口容量。

使用本文中的定义, 可以将其转换为:

在以下情况下, 由以图H表示的流动网络表示的maxFlowProblem解决方案是最佳的:

get_flow_value(H, maxFlowProblem) == cut_capacity(minStCut, maxFlowProblem.G)

我喜欢这个定理的证明, 而维基百科还有另一个证明。

最大流量, 最小割定理用于证明福特-福克森方法的正确性和完整性。

补充路径后, 我还将在本节中提供一个定理的证明。

Ford-Fulkerson方法和Edmonds-Karp算法

CLRS定义了Ford-Fulkerson方法, 如下所示(第26.2节):

FORD-FULKERSON-METHOD(G, s, t):
initialize flow f to 0
while there exists an augmenting path p in the residual graph G_f augment flow f along

残差图

用图G表示的流动网络的残差图可以表示为图G_f:

ResidualDatum = namedtuple('ResidualDatum', ['residualCapacity', 'action'])

def agg_n_to_u_cap(n, u, G_as_dict):
    arcs_out = G_as_dict[n]
    return sum([a.datum.capacity for a in arcs_out if( (a.fromNode == n) and (a.toNode == u) ) ])

def agg_n_to_u_flow(n, u, G_as_dict):
    arcs_out = G_as_dict[n]
    return sum([a.datum.flow for a in arcs_out if( (a.fromNode == n) and (a.toNode == u) ) ])

def get_residual_graph_of(G):
    G_as_dict = digraph_to_dict(G)
    residual_nodes = G.setOfNodes
    residual_arcs = frozenset([])

    for n in G_as_dict:
        arcs_from = G_as_dict[n]
        nodes_to = frozenset([find_node_by_uid(a.toNode.uid, G) for a in arcs_from])

        for u in nodes_to:
            n_to_u_cap_sum  = agg_n_to_u_cap(n, u, G_as_dict)
            n_to_u_flow_sum = agg_n_to_u_flow(n, u, G_as_dict)
            if(n_to_u_cap_sum > n_to_u_flow_sum):
                flow = round(n_to_u_cap_sum - n_to_u_flow_sum, TOL)
                residual_arcs = residual_arcs.union( frozenset([Arc(n, u, datum=ResidualDatum(flow, 'push'))]) )
            if(n_to_u_flow_sum > 0.0):
                flow = round(n_to_u_flow_sum, TOL)
                residual_arcs = residual_arcs.union( frozenset([Arc(u, n, datum=ResidualDatum(flow, 'pull'))]) )
    return DiGraph(residual_nodes, residual_arcs)
  • agg_n_to_u_cap(n, u, G_as_dict)返回G.setOfArcs子集中所有弧的a.datum.capacity的总和, 如果a.fromNode = n并且a.toNode = u, 则弧a在子集中。

  • agg_n_to_u_cap(n, u, G_as_dict)返回G.setOfArcs子集中所有弧的a.datum.flow之和, 如果a.fromNode = n和a.toNode = u, 则弧a在子集中。

简而言之, 残差图G_f表示可以对有向图G执行的某些动作。

有向图G表示的流动网络的G.setOfNodes中的每对节点n, u可以在G的剩余图G_f中生成0、1或2个弧。

  1. 如果在G.setOfArcs中没有弧a, 使得a.fromNode = n和a.toNode = u, 则对n, u在G_f中不生成弧。

  2. 对n, u在G_f.setOfArcs中生成弧a, 其中a表示标记为从n到u的推流弧的弧a = Arc(n, u, datum = ResidualNode(n_to_u_cap_sum-n_to_u_flow_sum))如果n_to_u_cap_sum> n_to_u_flow_sum。

  3. 对n, u在G_f.setOfArcs中生成弧a, 其中a表示标记为从n到u的拉流弧的弧a = Arc(n, u, datum = ResidualNode(n_to_u_cap_sum-n_to_u_flow_sum))如果n_to_u_flow_sum> 0.0。

  • G_f.setOfArcs中的每个推流弧均表示将总计x <= n_to_u_cap_sum-n_to_u_flow_sum流添加到G.setOfArcs子集中的弧的动作, 如果a.fromNode = n和a.toNode =, 则弧a在子集中。你

  • G_f.setOfArcs中的每个拉流弧表示将x。= n_to_u_flow_sum流的总数减去g.setOfArcs子集中的弧的动作, 如果a.fromNode = n并且a.toNode = u, 则弧a在子集中。

对G中适用的弧线从G_f执行单独的推或拉动作可能会生成没有可行流的流网络, 因为在生成的流网络中可能会违反容量约束或守恒约束。

这是上一示例中最大流量解决方案的可视化残差图的可视化效果, 在可视化效果中, 每个圆弧a代表a.residualCapacity。

最大流量可视化(剩余)

增强路径

令maxFlowProblem为最大流量问题, 令G_f = get_residual_graph_of(G)为maxFlowProblem.G的残差图。

maxFlowProblem的扩充路径增强路径是从find_node_by_uid(maxFlowProblem.sourceNode, G_f)到find_node_by_uid(maxFlowProblem.terminalNode, G_f)的任何路径。

事实证明, 可以将扩展路径expandingPath应用于图H表示的最大流量解, 生成另一个由图K表示的最大流量解, 其中, 如果H不是最优的, 则get_flow_value(H, maxFlowProblem)<get_flow_value(K, maxFlowProblem)。

这是如何做:

def augment(augmentingPath, H):
    augmentingPath = list(augmentingPath)
    H_as_dict = digraph_to_dict(H)
    new_nodes = frozenset({})
    new_arcs = frozenset({})
    visited_nodes  = frozenset({})
    visited_arcs = frozenset({})

    bottleneck_residualCapacity = min( augmentingPath, key=lambda a: a.datum.residualCapacity ).datum.residualCapacity
    for x in augmentingPath:
        from_node_uid = x.fromNode.uid if x.datum.action == 'push' else x.toNode.uid
        to_node_uid = x.toNode.uid   if x.datum.action == 'push' else x.fromNode.uid
        from_node = find_node_by_uid( from_node_uid, H )
        to_node = find_node_by_uid( to_node_uid, H )
        load = bottleneck_residualCapacity if x.datum.action == 'push' else -bottleneck_residualCapacity

        for a in H_as_dict[from_node]:
            if(a.toNode == to_node):
                test_sum = a.datum.flow + load
                new_flow = a.datum.flow
                new_from_node_flow_out = a.fromNode.datum.flowOut
                new_to_node_flow_in = a.toNode.datum.flowIn

                new_from_look = {n for n in new_nodes if (n.uid == a.fromNode.uid)}
                new_to_look = {n for n in new_nodes if (n.uid == a.toNode.uid)  }
                prev_from_node = new_from_look.pop() if (len(new_from_look)>0) else a.fromNode
                prev_to_node = new_to_look.pop()   if (len(new_to_look)>0)   else a.toNode
                new_nodes = new_nodes.difference(frozenset({prev_from_node}))
                new_nodes = new_nodes.difference(frozenset({prev_to_node}))

                if(test_sum > a.datum.capacity):
                    new_flow = a.datum.capacity
                    new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow + a.datum.capacity
                    new_to_node_flow_in    = prev_to_node.datum.flowIn - a.datum.flow + a.datum.capacity
                    load = test_sum - a.datum.capacity
                elif(test_sum < 0.0):
                    new_flow = 0.0
                    new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow
                    new_to_node_flow_in    = prev_to_node.datum.flowIn - a.datum.flow
                    load = test_sum
                 else:
                    new_flow = test_sum
                    new_from_node_flow_out = prev_from_node.datum.flowOut - a.datum.flow + new_flow
                    new_to_node_flow_in    = prev_to_node.datum.flowIn - a.datum.flow + new_flow
                    load = 0.0

                new_from_node_flow_out = round(new_from_node_flow_out, TOL)
                new_to_node_flow_in    = round(new_to_node_flow_in, TOL)
                new_flow               = round(new_flow, TOL)

                new_from_node = Node(prev_from_node.uid, FlowNodeDatum(prev_from_node.datum.flowIn, new_from_node_flow_out))
                new_to_node   = Node(prev_to_node.uid, FlowNodeDatum(new_to_node_flow_in, prev_to_node.datum.flowOut))
                new_arc       = Arc(new_from_node, new_to_node, FlowArcDatum(a.datum.capacity, new_flow))

                visited_nodes = visited_nodes.union(frozenset({a.fromNode, a.toNode}))
                visited_arcs  = visited_arcs.union(frozenset({a}))
                new_nodes     = new_nodes.union(frozenset({new_from_node, new_to_node}))
                new_arcs      = new_arcs.union(frozenset({new_arc}))

    not_visited_nodes = H.setOfNodes.difference(visited_nodes)
    not_visited_arcs  = H.setOfArcs.difference(visited_arcs)

    full_new_nodes = new_nodes.union(not_visited_nodes)
    full_new_arcs  = new_arcs.union(not_visited_arcs)
    G = DiGraph(full_new_nodes, full_new_arcs)

    full_new_arcs_update = frozenset([])
    for a in full_new_arcs:
        new_from_node = a.fromNode
        new_to_node   = a.toNode
        new_from_node = find_node_by_uid( a.fromNode.uid, G )
        new_to_node   = find_node_by_uid( a.toNode.uid, G )
        full_new_arcs_update = full_new_arcs_update.union( {Arc(new_from_node, new_to_node, FlowArcDatum(a.datum.capacity, a.datum.flow))} )

    G = DiGraph(full_new_nodes, full_new_arcs_update)

    return G

在上面, TOL是一些舍入值, 用于舍入网络中的流量值。这是为了避免级联不精确的浮点计算。因此, 例如, 我使用TOL = 10表示舍入到10个有效数字。

令K =增量(augmentingPath, H), 则K表示maxFlowProblem的可行最大流量解决方案。为了使陈述正确, 以K表示的流动网络必须具有可行的流动(不违反容量约束或守恒约束)。

原因如下:在上述方法中, 添加到图K表示的新流网络中的每个节点都是图H的节点的精确副本, 或者是添加到n.datum.flowIn中具有相同编号的节点n。其n.datum.flowOut。这意味着只要在H中满足, 就在K中满足守恒约束。之所以满足守恒约束, 是因为我们明确地检查网络中任何新的弧a是否具有a.datum.flow <= a.datum.capacity。因此, 只要来自集合H.setOfArcs的弧被原样复制到K.setOfArcs中, 就不会违反容量约束, 那么K不会违反容量约束。

如果H不是最佳值, 则get_flow_value(H, maxFlowProblem)<get_flow_value(K, maxFlowProblem)也是正确的。

原因如下:要使扩展路径增强路径存在于最大流量问题maxFlowProblem的残差图G_f的图表示中, 则增强路径上的最后一个弧必须为”推”弧, 并且必须具有a.toNode == maxFlowProblem。 terminalNode。增强路径定义为终止于最大流量问题的终端节点的路径, 对于该路径, 它是增强路径。从残差图的定义中可以明显看出, 该残差图上扩展路径中的最后一个弧必须是”推”弧, 因为扩展路径中的任何”拉”弧b都将具有b.toNode == maxFlowProblem。 terminalNode和b.fromNode!= maxFlowProblem.terminalNode来自路径的定义。另外, 从路径的定义中可以明显看出, 终端节点仅可通过增强方法修改一次。因此, expand会一次修改maxFlowProblem.terminalNode.flowIn的值, 并且会增加maxFlowProblem.terminalNode.flowIn的值, 这是因为增强路径中的最后一个弧必须是在扩增过程中导致对maxFlowProblem.terminalNode.flowIn进行修改的弧。从适用于”推”弧的增强定义, maxFlowProblem.terminalNode.flowIn只能增加, 而不能减少。

Sedgewick和Wayne的一些证明

Robert Sedgewich和Kevin Wayne所著的《算法》第四版有一些精彩而简短的证明(第892-894页), 将很有用。我将在这里重新创建它们, 尽管我会在以前的定义中使用适合的语言。我的证明标签与Sedgewick书中的标签相同。

命题E:对于任何表示最大流量问题maxFlowProblem的可行最大流量解的有向图H, 对于任何stCut get_stcut_flow(stCut, H, maxFlowProblem)= get_flow_value(H, maxFlowProblem)。

证明:让stCut = stCut(maxFlowProblem.sourceNode, maxFlowProblem.terminalNode, set([如果a.toNode == maxFlowProblem.terminalNode, 则为H.setOfArcs中的a表示)))。命题E直接根据s-t流量值的定义适用于stCut。假设我们希望将某些节点n从s分区(get_first_part(stCut.cut, G))移至t分区(get_second_part(stCut.cut, G)), 为此, 我们需要更改stCut .cut, 它可能会更改stcut_flow = get_stcut_flow(stCut, H, maxFlowProblem)并使命题E无效。但​​是, 让我们看看stcut_flow的值在进行此更改时将如何更改。节点n处于平衡状态, 这意味着流入节点n的总流量等于流出节点n的总流量(这对于H表示可行解是必要的)。请注意, 属于stcut_flow输入节点n的所有流都从s分区进入(直接或间接从t分区进入节点n的流将不会被计入stcut_flow值, 因为它的方向错误根据定义)。另外, 所有退出n的流最终(直接或间接)将流到终端节点(先前已证明)。当我们将节点n移到t分区中时, 所有从s分区进入n的流都必须添加到新的stcut_flow值中;但是, 必须从新的stcut_flow值中减去所有退出n的流;减去直接进入t分区的流的一部分, 因为此流现在在新的t分区内部, 并且不计为stcut_flow。还必须从stcut_flow中减去从n头到s分区中节点的流的一部分:将n移入t分区后, 这些流将从t分区定向到s分区, 依此类推。一定不要在stcut_flow中考虑, 因为这些流量已被除去, 流入s分区的流量必须减少这些流量的总和, 以及从s分区到t分区的流出流量(所有流量从st必须结束)必须减少相同的数量。由于节点n在此过程之前处于平衡状态, 因此更新将为其减去的新stcut_flow值添加相同的值, 从而使命题E在更新后为true。命题E的有效性来自对t分区大小的归纳。

以下是一些流动网络示例, 可帮助形象化命题E成立的不太明显的情况。在图像中, 红色区域表示s分区, 蓝色区域表示t分区, 绿色弧线表示s-t切割。在第二张图中, 节点A和节点B之间的流量增加, 而流入终端节点t的流量不变。

最大流量和线性分配问题4
最大流量和线性分配问题5

结论:s-t切割流量值不能超过任何s-t切割的能力。

命题F.(最大流量, 最小割定理):设f为s-t流量。以下3个条件是等效的:

  1. 存在一个s-t切口, 其容量等于流量f的值。

  2. f是最大流量。

  3. 关于f没有增加路径。

条件1暗示了条件2。条件2意味着条件3, 因为存在扩展路径意味着存在具有更大值的流, 这与f的最大值相矛盾。条件3表示条件1:令C_s为残差图中可以通过增加路径从s到达的所有节点的集合。令C_t为剩余弧, 则t必须在C_t中(根据我们的假设)。然后, 从C_s到C_t的弧段形成一个s-t切口, 其中仅包含a.datum.capacity = a.datum.flow或a.datum.flow = 0.0的弧段。如果不是这样, 则通过弧将剩余容量连接到C_s的节点将位于集合C_s中, 因为此时将存在从s到该节点的增加路径。穿过s-t切口的流量等于s-t切口的容量(因为从C_s到C_t的弧的流量等于容量), 也等于s-t流量的值(通过命题E)。

关于最大流, 最小割定理的陈述暗示了网络流中较早的陈述。

推论(完整性属性):当容量为整数时, 存在一个整数值的最大流量, 而Ford-Fulkerson算法找到了它。

证明:每条增径将流量增加一个正整数, “推”弧中未使用容量的最小值和”推”弧中的未使用容量的最小值, 所有这些始终都是正整数。

这证明了CLRS对Ford-Fulkerson方法的描述是正确的。该方法是不断寻找扩充路径, 并将扩充应用于具有更好解决方案的最新maxFlowSolution, 直到不再有扩充路径, 这意味着最新的最大流量解决方案是最佳的。

从福特福克森到埃德蒙兹卡普

有关解决最大流量问题的其余问题是:

  1. 应该如何构建扩展路径?

  2. 如果我们使用实数而不是整数, 该方法会终止吗?

  3. 终止需要多长时间(如果终止)?

Edmonds-Karp算法指定通过残差图的广度优先搜索(BFS)构造每个扩展路径;事实证明, 以上第1点的决定还将强制算法终止(第2点), 并确定渐近时间和空间复杂度。

首先, 这是一个BFS实现:

def bfs(sourceNode, terminalNode, G_f):
    G_f_as_dict = digraph_to_dict(G_f)
    parent_arcs = dict([])
    visited = frozenset([])
    deq = deque([sourceNode])
    while len(deq) > 0:
        curr = deq.popleft()
        if curr == terminalNode:
            break
        for a in G_f_as_dict.get(curr):
            if (a.toNode not in visited):
                visited = visited.union(frozenset([a.toNode]))
                parent_arcs[a.toNode] = a
                deq.extend([a.toNode])
    path = list([])
    curr = terminalNode
    while(curr != sourceNode):
        if (curr not in parent_arcs):
            err_msg = 'No augmenting path from {} to {}.'.format(sourceNode.uid, terminalNode.uid)
            raise StopIteration(err_msg)
        path.extend([parent_arcs[curr]])
        curr = parent_arcs[curr].fromNode
    path.reverse()

    test = deque([path[0].fromNode])
    for a in path:
        if(test[-1] != a.fromNode):
            err_msg = 'Bad path: {}'.format(path)
            raise ValueError(err_msg)
        test.extend([a.toNode])

    return path

我使用了python collections模块中的双端队列。

为了回答上面的问题2, 我将解释Sedgewick和Wayne的另一个证明:命题G。具有N个节点和A个弧的Edmonds-Karp算法中所需的增广路径数最多为NA / 2。证明:每个扩充路径都有一个瓶颈弧-从残差图中删除的弧, 因为它对应于填充为容量的”推”弧或流变为0的”拉”弧。圆弧成为瓶颈圆弧, 通过它的任何扩展路径的长度必须增加2倍。这是因为路径中的每个节点可能仅出现一次或根本不出现(根据路径的定义)。从最短路径到最长路径的探索意味着, 经过特定瓶颈节点的下一路径必须至少再访问一个节点, 这意味着在我们到达该节点之前, 路径上还会有另外两个弧。由于扩展路径的长度最多为N, 因此每个弧最多可以位于N / 2个扩展路径上, 并且扩展路径的总数最多为NA / 2。

Edmonds-Karp算法在O(NA ^ 2)中执行。如果在算法期间最多探索NA / 2条路径, 并且使用BFS探索每条路径为N + A, 则乘积的最高有效项, 因此, 渐近复杂度为O(NA ^ 2)。

令mfp为maxFlowProblemState。

def edmonds_karp(mfp):
    sid, tid = mfp.sourceNodeUid, mfp.terminalNodeUid
    no_more_paths = False
    loop_count = 0
    while(not no_more_paths):
       residual_digraph = get_residual_graph_of(mfp.G)
       try:
           asource = find_node_by_uid(mfp.sourceNodeUid, residual_digraph)
           aterm   = find_node_by_uid(mfp.terminalNodeUid, residual_digraph)
           apath   = bfs(asource, aterm, residual_digraph)
           paint_mfp_path(mfp, loop_count, apath)
           G = augment(apath, mfp.G)
           s = find_node_by_uid(sid, G)
           t = find_node_by_uid(tid, G)
           mfp = MaxFlowProblemState(G, s.uid, t.uid, mfp.maxFlowProblemStateUid)
           loop_count += 1
       except StopIteration as e:
           no_more_paths = True
    return mfp

上面的版本比O(NA ^ 2)效率低下, 并且复杂度比O(NA ^ 2)低, 因为它每次都构造一个新的最大流量解和一个新的残差图(而不是随着算法的发展而修改现有的图。为了获得真正的O(NA ^ 2)解, 该算法必须同时维护表示最大流量问题状态的有向图及其相关的残差图。因此, 算法必须避免不必要地遍历弧和节点, 并仅在必要时更新残差图中的值和关联值。

为了编写更快的Edmonds Karp算法, 我从上面重写了几段代码。我希望阅读生成新的有向图的代码有助于理解正在发生的事情。在快速算法中, 我使用了一些我不想详细介绍的新技巧和Python数据结构。我会说a.fromNode和a.toNode现在被视为字符串和节点的uid。对于此代码, 令mfps为maxFlowProblemState

import uuid

def fast_get_mfps_flow(mfps):
    flow_from_s = {n for n in mfps.G.setOfNodes if n.uid == mfps.sourceNodeUid}.pop().datum.flowOut
    flow_to_t   = {n for n in mfps.G.setOfNodes if n.uid == mfps.terminalNodeUid}.pop().datum.flowIn

    if( (flow_to_t - flow_from_s) > 0.00):
        raise Exception('Infeasible s-t flow')
    return flow_to_t

def fast_e_k_preprocess(G):
    G = strip_flows(G)
    get = dict({})
    get['nodes'] = dict({})
    get['node_to_node_capacity'] = dict({})
    get['node_to_node_flow'] = dict({})
    get['arcs'] = dict({})
    get['residual_arcs'] = dict({})

    for a in G.setOfArcs:
        if(a.fromNode not in G.setOfNodes):
            err_msg = 'There is no Node {a.fromNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals())
            raise KeyError(err_msg)
        if(a.toNode not in G.setOfNodes):
            err_msg = 'There is no Node {a.toNode.uid!s} to match the Arc from {a.fromNode.uid!s} to {a.toNode.uid!s}'.format(**locals())
            raise KeyError(err_msg)
        get['nodes'][a.fromNode.uid] = a.fromNode
        get['nodes'][a.toNode.uid]   = a.toNode
        lark = Arc(a.fromNode.uid, a.toNode.uid, FlowArcDatumWithUid(a.datum.capacity, a.datum.flow, uuid.uuid4()))
        if(a.fromNode.uid not in get['arcs']):
            get['arcs'][a.fromNode.uid] = dict({a.toNode.uid : dict({lark.datum.uid : lark})})
        else:
            if(a.toNode.uid not in get['arcs'][a.fromNode.uid]):
                get['arcs'][a.fromNode.uid][a.toNode.uid] = dict({lark.datum.uid : lark})
            else:
                get['arcs'][a.fromNode.uid][a.toNode.uid][lark.datum.uid] = lark

    for a in G.setOfArcs:
        if a.toNode.uid not in get['arcs']:
            get['arcs'][a.toNode.uid] = dict({})

    for n in get['nodes']:
        get['residual_arcs'][n] = dict()
        get['node_to_node_capacity'][n] = dict()
        get['node_to_node_flow'][n] = dict()
        for u in get['nodes']:
            n_to_u_cap_sum  = sum(a.datum.capacity for a in G.setOfArcs if (a.fromNode.uid == n) and (a.toNode.uid == u) )
            n_to_u_flow_sum = sum(a.datum.flow     for a in G.setOfArcs if (a.fromNode.uid == n) and (a.toNode.uid == u) )

            if(n_to_u_cap_sum > n_to_u_flow_sum):
                flow = round(n_to_u_cap_sum - n_to_u_flow_sum, TOL)
                get['residual_arcs'][n][u] = Arc(n, u, ResidualDatum(flow, 'push'))
            if(n_to_u_flow_sum > 0.0):
                flow = round(n_to_u_flow_sum, TOL)
                get['residual_arcs'][u][n] = Arc(u, n, ResidualDatum(flow, 'pull'))

            get['node_to_node_capacity'][n][u] = n_to_u_cap_sum
            get['node_to_node_flow'][n][u]     = n_to_u_flow_sum

    return get

def fast_bfs(sid, tid, get):
    parent_of   = dict([])
    visited     = frozenset([])
    deq         = coll.deque([sid])
    while len(deq) > 0:
        n = deq.popleft()
        if n == tid:
            break
        for u in get['residual_arcs'][n]:
            if (u not in visited):
                visited = visited.union(frozenset({u}))
                parent_of[u] = get['residual_arcs'][n][u]
                deq.extend([u])
    path = list([])
    curr = tid
    while(curr != sid):
        if (curr not in parent_of):
            err_msg = 'No augmenting path from {} to {}.'.format(sid, curr)
            raise StopIteration(err_msg)
        path.extend([parent_of[curr]])
        curr = parent_of[curr].fromNode
    path.reverse()
    return path

def fast_edmonds_karp(mfps):
    sid, tid = mfps.sourceNodeUid, mfps.terminalNodeUid
    get = fast_e_k_preprocess(mfps.G)
    no_more_paths, loop_count = False, 0
    while(not no_more_paths):
       try:
           apath   = fast_bfs(sid, tid, get)
           get = fast_augment(apath, get)
           loop_count += 1
       except StopIteration as e:
           no_more_paths = True
    nodes = frozenset(get['nodes'].values())
    arcs = frozenset({})
    for from_node in get['arcs']:
        for to_node in get['arcs'][from_node]:
            for arc in get['arcs'][from_node][to_node]:
                arcs |= frozenset({get['arcs'][from_node][to_node][arc]})

    G = DiGraph(nodes, arcs)
    mfps = MaxFlowProblemState(G, sourceNodeUid=sid, terminalNodeUid=tid, maxFlowProblemStateUid=mfps.maxFlowProblemStateUid)
    return mfps

这是该算法如何从上方解决示例流量网络的图表。可视化显示了这些步骤, 它们反映在代表最新流网络的图G中, 也反映在该流网络的残差图中。残差图中的增强路径显示为红色路径, 而表示问题的有向图以绿色突出显示受给定增强路径影响的节点和弧的集合。在每种情况下, 我都会突出显示将要更改的图形部分(红色或绿色), 然后在更改后显示图形(仅黑色)。

最大流量可视化
最大流量可视化(剩余)

这是该算法如何解决其他示例流网络的另一种可视化形式。注意, 这个使用实数, 并且包含具有相同fromNode和toNode值的多个弧。

**还请注意, 由于带有”拉”残差基准的圆弧可能是增强路径的一部分, 因此在飞行网络的DiGraph中受影响的节点可能不在G!中的路径上!

二部图

假设我们有一个有向图G, 如果可以将G.setOfNodes中的节点划分为两个集合(part_1和part_2), 则G是二分的, 这样对于G.setOfArcs中的任何弧a来说, part_1和part_1中的a.toNode。 part_2中的a.fromNode和part_2中的a.toNode也是不正确的。

换句话说, 如果G可以划分为两组节点, 则每条弧必须将一组中的一个节点连接到另一组中的一个节点, 则G是二分的。

测试二分

假设我们有一个有向图G, 我们想测试它是否是二部图。我们可以通过将图形贪婪地着色为两种颜色, 在O(| G.setOfNodes | + | G.setOfArcs |)中进行此操作。

首先, 我们需要生成一个新的有向图H。该图将具有与G相同的节点集, 但是它将具有比G多的弧。G中的每个弧a将在H中创建2个弧;第一个弧将与a相同, 第二个弧将使a的指向矢反转(b = Arc(a.toNode, a.fromNode, a.datum))。

Bipartition = coll.namedtuple('Bipartition', ['firstPart', 'secondPart', 'G'])
def bipartition(G):
    nodes = frozenset(G.setOfNodes
    arcs  = frozenset(G.setOfArcs)
    arcs = arcs.union( frozenset( {Arc(a.toNode, a.fromNode, a.datum) for a in G.setOfArcs} ) )
    H = DiGraph(nodes, arcs)
    H_as_dict = digraph_to_dict(H)
    color = dict([])
    some_node = next(iter(H.setOfNodes))
    deq = coll.deque([some_node])
    color[some_node] = -1
    while len(deq) > 0:
        curr = deq.popleft()
        for a in H_as_dict.get(curr):
            if (a.toNode not in color):
                color[a.toNode] = -1*color[curr]
                deq.extend([a.toNode])
            elif(color[curr] == color[a.toNode]):
                print(curr, a.toNode)
                raise Exception('Not Bipartite.')

    firstPart  = frozenset( {n for n in color if color[n] == -1 } )
    secondPart = frozenset( {n for n in color if color[n] == 1 } )

    if( firstPart.union(secondPart) != G.setOfNodes ):
        raise Exception('Not Bipartite.')

    return Bipartition(firstPart, secondPart, G)

匹配项和最大匹配项

假设我们有一个有向图G, 并且匹配项是G.setOfArcs中弧的子集。如果对于匹配中的任意两个弧a和b, 则匹配是匹配:len(frozenset({a.fromNode}).union({a.toNode}).union({b.fromNode}).union({b.toNode }))== 4.换句话说, 匹配项中没有两个弧共享一个节点。

如果G中没有其他匹配的alt_matching使得len(matching)<len(alt_matching), 则matching matching为最大匹配。换句话说, 如果匹配是G.setOfArcs中最大的弧组, 但仍满足匹配的定义, 则匹配为最大匹配(添加任何尚未包含在匹配中的弧都将破坏匹配的定义)。

如果G.setOfArcs中的每个节点n在匹配中存在一个弧a, 其中a.fromNode == n或a.toNode == n, 则最大匹配匹配是一个完美匹配。

最大二分匹配

最大二分匹配是二分图G上的最大匹配。

假设G是二分的, 则找到最大二分匹配的问题可以转化为最大流问题, 可以使用Edmonds-Karp算法解决, 然后可以将最大二分匹配从求解中恢复为最大流问题。

令bipartition为G的二分法。

为此, 我需要生成一个新的有向图(H), 其中包含一些新节点(H.setOfNodes)和一些新弧线(H.setOfArcs)。 H.setOfNodes包含G.setOfNodes中的所有节点, 以及另外两个节点s(源节点)和t(终端节点)。

H.setOfArcs将为每个G.setOfArcs包含一个弧。如果弧a在G.setOfArcs中, 而a.fromNode在bipartition.firstPart中, 而a.toNode在bipartition.secondPart中, 则在H中包含a(添加FlowArcDatum(1, 0))。

如果a.fromNode在bipartition.secondPart中, 而a.toNode在bipartition.firstPart中, 则在H.setOfArcs中包括Arc(a.toNode, a.fromNode, FlowArcDatum(1, 0))。

二分图的定义可确保没有弧连接两个节点在同一分区中的任何节点。 H.setOfArcs还包含从节点s到bipartition.firstPart中的每个节点的弧。最后, H.setOfArcs在bipartition.secondPart中的每个节点到节点t都包含一条弧。 H.setOfArcs中所有a的a.datum.capacity = 1。

首先将G.setOfNodes中的节点划分为两个不相交的集合(第1部分和第2部分), 以使G.setOfArcs中没有弧从一个集合指向同一集合(此划分是可能的, 因为G是二分的)。接下来, 将G.setOfArcs中所有从第1部分到第2部分定向的弧添加到H.setOfArcs中。然后创建单个源节点s和单个终端节点t并创建更多弧

然后, 构造一个maxFlowProblemState。

def solve_mbm( bipartition ):
    s = Node(uuid.uuid4(), FlowNodeDatum(0, 0))
    t = Node(uuid.uuid4(), FlowNodeDatum(0, 0))

    translate = {}
    arcs = frozenset([])
    for a in bipartition.G.setOfArcs:
        if ( (a.fromNode in bipartition.firstPart) and (a.toNode in bipartition.secondPart) ):
            fark = Arc(a.fromNode, a.toNode, FlowArcDatum(1, 0))
            arcs = arcs.union({fark})
            translate[frozenset({a.fromNode.uid, a.toNode.uid})] = a
        elif ( (a.toNode in bipartition.firstPart) and (a.fromNode in bipartition.secondPart) ):
            bark = Arc(a.toNode, a.fromNode, FlowArcDatum(1, 0))
            arcs = arcs.union({bark})
            translate[frozenset({a.fromNode.uid, a.toNode.uid})] = a
    arcs1 = frozenset( {Arc(s, n, FlowArcDatum(1, 0)) for n in bipartition.firstPart } )
    arcs2 = frozenset( {Arc(n, t, FlowArcDatum(1, 0)) for n in bipartition.secondPart } )
    arcs = arcs.union(arcs1.union(arcs2))

    nodes = frozenset( {Node(n.uid, FlowNodeDatum(0, 0)) for n in bipartition.G.setOfNodes} ).union({s}).union({t})
    G = DiGraph(nodes, arcs)
    mfp = MaxFlowProblemState(G, s.uid, t.uid, 'bipartite')
    result = edmonds_karp(mfp)

    lookup_set = {a for a in result.G.setOfArcs if (a.datum.flow > 0) and (a.fromNode.uid != s.uid) and (a.toNode.uid != t.uid)}
    matching = {translate[frozenset({a.fromNode.uid, a.toNode.uid})] for a in lookup_set}

    return matching

最小节点覆盖

有向图G中的节点覆盖是G.setOfNodes的一组节点(覆盖), 因此对于G.setOfArcs的任何弧a, 它都必须为true :(覆盖中的a.fromNode)或(覆盖中的a.toNode)。

最小的节点覆盖范围是图中仍然是节点覆盖范围的可能的最小节点集。柯尼希定理指出, 在二分图中, 该图上最大匹配的大小等于最小节点覆盖的大小, 并建议如何从最大匹配中恢复节点覆盖:

假设我们有二分法bipartition和最大匹配法则。定义一个新的有向图H, H.setOfNodes = G.setOfNodes, H.setOfArcs中的弧是两个集合的并集。

第一组是匹配的圆弧a, 其变化是, 如果在bipartition.firstPart中的a.fromNode和bipartition.secondPart中的a.toNode在创建的圆弧中交换a.fromNode和a.toNode, 则这样的圆弧将a.datum .inMatching = True属性, 指示它们是从匹配中的圆弧派生的。

第二组是匹配中的非圆弧, 其变化是, 如果在bipartition.secondPart中的a.fromNode和bipartition.firstPart中的a.toNode则在创建的圆弧中交换a.fromNode和a.toNode(给定这样的圆弧a .datum.inMatching = False属性)。

接下来, 从bipartition.firstPart中的每个节点n开始运行深度优先搜索(DFS), 该深度既不是匹配的任何弧a的n == a.fromNode也不是n == a.toNodes。在DFS期间, 访问了一些节点, 而没有访问某些节点(将此信息存储在n.datum.visited字段中)。最小的节点覆盖是节点{a.fromNode for a。((a.datum.inMatching)和(a.fromNode.datum.visited))}}和节点{a.fromNode for a。在H.setOfArcs中, 如果(a.datum.inMatching)和(不是a.toNode.datum.visited)}。

可以通过矛盾证明证明这从最大值匹配到最小节点覆盖率, 采取了一个可能未被涵盖的弧度, 并考虑了关于a.fromNode和a.toNode是否属于的所有四种情况(无论是toNode还是fromNode)到匹配匹配中的任何弧。由于DFS访问节点的顺序以及匹配是最大匹配的事实, 每种情况都会导致矛盾。

假设我们有一个函数来执行这些步骤, 并在给定有向图G的情况下返回包含最小节点覆盖率和最大匹配匹配项的节点集:

ArcMatchingDatum = coll.namedtuple('ArcMatchingDatum', ['inMatching' ])
NodeMatchingDatum = coll.namedtuple('NodeMatchingDatum', ['visited'])

def dfs_helper(snodes, G):
    sniter, do_stop = iter(snodes), False
    visited, visited_uids = set(), set()
    while(not do_stop):
        try:
            stack = [ next(sniter) ]
            while len(stack) > 0:
                curr = stack.pop()
                if curr.uid not in visited_uids:
                    visited = visited.union( frozenset( {Node(curr.uid, NodeMatchingDatum(False))} ) )
                    visited_uids = visited.union(frozenset({curr.uid}))
                    succ = frozenset({a.toNode for a in G.setOfArcs if a.fromNode == curr})
                    stack.extend( succ.difference( frozenset(visited) ) )
        except StopIteration as e:
            stack, do_stop = [], True
    return visited

def get_min_node_cover(matching, bipartition):
    nodes = frozenset( { Node(n.uid, NodeMatchingDatum(False)) for n in bipartition.G.setOfNodes} )
    G = DiGraph(nodes, None)
    charcs = frozenset( {a for a in matching if ( (a.fromNode in bipartition.firstPart) and (a.toNode in bipartition.secondPart) )} )
    arcs0 = frozenset( { Arc(find_node_by_uid(a.toNode.uid, G), find_node_by_uid(a.fromNode.uid, G), ArcMatchingDatum(True) ) for a in charcs } )
    arcs1 = frozenset( { Arc(find_node_by_uid(a.fromNode.uid, G), find_node_by_uid(a.toNode.uid, G), ArcMatchingDatum(True) ) for a in matching.difference(charcs) } )
    not_matching = bipartition.G.setOfArcs.difference( matching )
    charcs = frozenset( {a for a in not_matching if ( (a.fromNode in bipartition.secondPart) and (a.toNode in bipartition.firstPart) )} )
    arcs2 = frozenset( { Arc(find_node_by_uid(a.toNode.uid, G), find_node_by_uid(a.fromNode.uid, G), ArcMatchingDatum(False) ) for a in charcs } )
    arcs3 = frozenset( { Arc(find_node_by_uid(a.fromNode.uid, G), find_node_by_uid(a.toNode.uid, G), ArcMatchingDatum(False) ) for a in not_matching.difference(charcs) } )
    arcs = arcs0.union(arcs1.union(arcs2.union(arcs3)))

    G = DiGraph(nodes, arcs)
    bip = Bipartition({find_node_by_uid(n.uid, G) for n in bipartition.firstPart}, {find_node_by_uid(n.uid, G) for n in bipartition.secondPart}, G)
    match_from_nodes = frozenset({find_node_by_uid(a.fromNode.uid, G) for a in matching})
    match_to_nodes = frozenset({find_node_by_uid(a.toNode.uid, G) for a in matching})
    snodes = bip.firstPart.difference(match_from_nodes).difference(match_to_nodes)
    visited_nodes = dfs_helper(snodes, bip.G)
    not_visited_nodes = bip.G.setOfNodes.difference(visited_nodes)

    H = DiGraph(visited_nodes.union(not_visited_nodes), arcs)
    cover1 = frozenset( {a.fromNode for a in H.setOfArcs if ( (a.datum.inMatching) and (a.fromNode.datum.visited) ) } )
    cover2 = frozenset( {a.fromNode for a in H.setOfArcs if ( (a.datum.inMatching) and (not a.toNode.datum.visited) ) } )
    min_cover_nodes = cover1.union(cover2)
    true_min_cover_nodes = frozenset({find_node_by_uid(n.uid, bipartition.G) for n in min_cover_nodes})

    return min_cover_nodes

线性分配问题

线性分配问题包括在加权二分图中找到最大权重匹配。

像本文一开始那样的问题可以表示为线性分配问题。给定一组工人, 一组任务以及一个函数, 该函数指示将一个工人分配给一个任务的获利能力, 我们希望使所做的所有分配的总和最大化。这是一个线性分配问题。

假设任务和工作人员的数量相等, 尽管我将证明此假设很容易消除。在实现中, 我用弧a的属性a.datum.weight表示弧权重。

WeightArcDatum = namedtuple('WeightArcDatum', [weight])

Kuhn-Munkres算法

Kuhn-Munkres算法解决了线性分配问题。一个好的实现可能花费O(N ^ {4})时间(其中N是有向图中表示问题的节点数)。一个易于解释的实现将O(N ^ {5})(对于重新生成DiGraphs的版本)和O(N ^ {4})用于(对于维护DiGraphs的版本)。这类似于Edmonds-Karp算法的两种不同实现。

对于此说明, 我仅使用完整的二部图(那些二分之一的节点在另一半的一部分中的节点)。在工作人员中, 任务动机意味着工作人员与任务数量一样多。

这似乎是一个重要条件(如果这些集合不相等会怎样!), 但是很容易解决此问题;我将在上一节中讨论如何做到这一点。

此处描述的算法版本使用零权重弧的有用概念。不幸的是, 这个概念仅在我们要解决最小化问题时才有意义(如果不是最大化工作者任务分配的利润, 而是最小化此类任务的成本)。

幸运的是, 通过将每个弧的权重设置为Ma.datum.weight, 将M = max({G.setOfArcs中a的a.datum.weight), 可以很容易地将最大线性分配问题变成最小线性分配问题。 。更改电弧权重后, 原始最大化问题的解决方案将与最小化问题的解决方案相同。因此, 对于其余部分, 假设我们进行了更改。

Kuhn-Munkres算法通过未加权二分图中的最大匹配序列来解决加权二分图中的最小权重匹配。如果在线性赋值问题的有向图表示上找到了完美的匹配, 并且如果匹配中每个弧的权重为零, 则我们找到了最小权重匹配, 因为该匹配表明有向图中的所有节点都为以尽可能低的成本匹配弧(根据先前的定义, 成本不能低于0)。

不能将其他弧添加到匹配中(因为所有节点均已匹配), 也不应从匹配中删除任何弧, 因为任何可能的替换弧都将具有至少相同的权重值。

如果我们发现G的子图的最大匹配项(仅包含零个权重弧), 而不是完美匹配项, 则我们没有完整的解决方案(因为匹配项不完美)。但是, 我们可以通过改变G.setOfArcs中弧的权重来生成新的有向图H, 这样就可以出现新的0权重弧, 并且H的最优解与G的最优解相同。因为我们保证每次迭代至少产生一个零权重的弧, 我们保证在不超过| G.setOfNodes | ^ {2} = N ^ {2}个迭代的情况下达到完美匹配。

假设在bipartition bipartition中, bipartition.firstPart包含代表工作程序的节点, 而bipartition.secondPart代表代表任务的节点。

该算法首先生成一个新的有向图H。H.setOfNodes = G.setOfNodes。 H中的某些弧是从bipartition.firstPart中的节点n生成的。每个这样的节点n为bipartition.G.setOfArcs中的每个弧a在H.setOfArcs中生成一个弧b, 其中a.fromNode = n或a.toNode = n, b = Arc(a.fromNode, a.toNode, a.datum .weight-z)其中z = min(如果((x.fromNode == n)或(x.toNode == n)), 则G.setOfArcs中x的x.datum.weight。

从bipartition.secondPart中的节点n生成H中更多的弧。每个此类节点n为bipartition.G.setOfArcs中的每个弧a在H.setOfArcs中生成一个弧b, 其中a.fromNode = n或a.toNode = n, b = Arc(a.fromNode, a.toNode, ArcWeightDatum(a .datum.weight-z)), 其中z = min(如果((x.fromNode == n)或(x.toNode == n))), 则在G.setOfArcs中x的x.datum.weight。

KMA:接下来, 形成一个新的有向图K, 该图仅包含来自H的零权重弧, 以及入射在这些弧上的节点。在K中的节点上形成一个二分, 然后使用resolve_mbm(bipartition)在K上获得最大匹配(匹配)。如果匹配在H中是完美匹配(匹配的弧入射在H.setOfNodes中的所有节点上)匹配是线性分配问题的最佳解决方案。

否则, 如果匹配不完美, 则使用no​​de_cover = get_min_node_cover(matching, bipartition(K))生成K的最小节点覆盖率。接下来, 定义z = min({如果不在node_cover中, 则{在H.setOfArcs中为a.datum.weight})。如果在(.a.fromNode不在node_cover中)和(a, 在H.setOfArcs中为a定义节点= H.setOfNodes, arcs1 = { .toNode不在node_cover中)}, arcs2 = {Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weigh))如果((a.fromNode不在node_cover中)!=(a .toNode不在node_cover中)}, arcs3 = {Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weigh + z))对于H.setOfArcs中的((上一个表达式中的!=符号充当XOR运算符, 然后arcs = arcs1.union(arcs2.union(arcs3))。接下来, H = DiGraph(nodes, arcs)。标记为KMA, 算法一直进行到产生完美匹配为止, 这也是线性分配问题的解决方案。

def kuhn_munkres( bipartition ):
    nodes = bipartition.G.setOfNodes
    arcs = frozenset({})

    for n in bipartition.firstPart:
        z = min( {x.datum.weight for x in bipartition.G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) )} )
        arcs = arcs.union( frozenset({Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weight - z)) }) )

    for n in bipartition.secondPart:
        z = min( {x.datum.weight for x in bipartition.G.setOfArcs if ( (x.fromNode == n) or (x.toNode == n) )} )
        arcs = arcs.union( frozenset({Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weight - z)) }) )

    H = DiGraph(nodes, arcs)

    assignment, value = dict({}), 0
    not_done = True

    while( not_done ):
        zwarcs = frozenset( {a for a in H.setOfArcs if a.datum.weight == 0} )
        znodes = frozenset( {n.fromNode for n in zwarcs} ).union( frozenset( {n.toNode for n in zwarcs} ) )
        K = DiGraph(znodes, zwarcs)
        k_bipartition = bipartition(K)
        matching = solve_mbm( k_bipartition )
        mnodes = frozenset({a.fromNode for a in matching}).union(frozenset({a.toNode for a in matching}))
        if( len(mnodes) == len(H.setOfNodes) ):
            for a in matching:
                assignment[ a.fromNode.uid ] = a.toNode.uid
            value = sum({a.datum.weight for a in matching})
            not_done = False
        else:
            node_cover = get_min_node_cover(matching, bipartition(K))
            z = min( frozenset( {a.datum.weight for a in H.setOfArcs if a not in node_cover} ) )
            nodes = H.setOfNodes
            arcs1 = frozenset( {Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weigh-z)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) and (a.toNode not in node_cover)} )
            arcs2 = frozenset( {Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weigh)) for a in H.setOfArcs if ( (a.fromNode not in node_cover) != (a.toNode not in node_cover)} )
            arcs3 = frozenset( {Arc(a.fromNode, a.toNode, ArcWeightDatum(a.datum.weigh+z)) for a in H.setOfArcs if ( (a.fromNode in node_cover) and (a.toNode in node_cover)} )
            arcs = arcs1.union(arcs2.union(arcs3))
            H = DiGraph(nodes, arcs)

    return value, assignment

此实现为O(N ^ {5}), 因为它在每次迭代时都会生成一个新的最大匹配匹配。类似于Edmonds-Karp的前两个实现, 可以对该算法进行修改, 以便它跟踪匹配并使其智能地适应每次迭代。完成此操作后, 复杂度变为O(N ^ {4})。此算法的更高级和最新版本(需要一些更高级的数据结构)可以在O(N ^ {3})中运行。可以在这篇帖子中找到以上更简单的实现和更高级的实现的详细信息。

对弧权重的任何操作都不会修改算法返回的最终赋值。原因如下:由于我们的输入图始终是完整的二部图, 因此解决方案必须通过两个节点之间的弧线将一个分区中的每个节点映射到第二个分区中的另一个节点。请注意, 对弧权重执行的操作永远不会更改入射在任何特定节点上的弧的顺序(按权重排序)。

因此, 当算法终止于完美的完全二部匹配时, 为每个节点分配一个权重为零的弧, 因为在该算法中, 来自该节点的弧的相对顺序没有改变, 并且因为权重为零的弧是最便宜的弧, 因此完美的完全二部匹配可确保每个节点都存在一个这样的弧。这意味着所生成的解的确与原始线性分配问题的解相同, 而对电弧权重没有任何修改。

分配不平衡

似乎该算法非常有限, 因为如前所述, 它仅对完整的二部图(其中一半节点在二分之一的一部分中, 另一半在第二部分的一半)上运行。在工作人员中, 任务动机意味着工作人员与任务数量一样多(似乎很有限)。

但是, 有一个简单的转换可以消除此限制。假设工作人员少于任务, 我们添加一些虚拟工作人员(足以使生成的图成为完整的二部图)。每个虚拟工作人员都有一条从工作人员指向每个任务的弧线。每个这样的弧的权重为0(将其放置在匹配项中不会增加任何利润)。更改之后, 该图是一个完整的二分图, 我们可以解决。分配了虚拟工作程序的任何任务都不会启动。

假设任务多于工人。我们添加了一些虚拟任务(足以使生成的图成为完整的二部图)。每个虚拟任务都有一个从每个工作人员指向虚拟任务的弧线。每个这样的弧的权重为0(将其放置在匹配项中不会增加任何利润)。更改之后, 该图是一个完整的二分图, 我们可以解决。在此期间, 未分配任何分配给虚拟任务的工人。

线性分配示例

最后, 让我们以我一直在使用的代码为例。我将从此处修改示例问题。我们有3个任务:我们需要打扫浴室, 扫地并洗窗户。

可以使用的工人是爱丽丝, 鲍勃, 查理和黛安。每个工人给我们每个任务所需的工资。这是每个工人的工资:

  清洁浴室 扫地 清洗窗口
爱丽丝 $2 $3 $3
鲍勃 $3 $2 $3
查理 $3 $3 $2
黛安 $9 $9 $1

如果我们想花最少的钱, 但仍然完成所有任务, 那么谁应该做什么任务呢?首先引入一个虚拟任务, 以使图表示问题二分。

  清洁浴室 扫地 清洗窗口 没做什么
爱丽丝 $2 $3 $3 $0
鲍勃 $3 $2 $3 $0
查理 $3 $3 $2 $0
黛安 $9 $9 $1 $0

假设问题编码在有向图中, 那么kuhn_munkres(bipartition(G))将解决问题并返回赋值。你可以很容易地确认最佳(最低费用)分配费用为5美元。

以下是上述代码生成的解决方案的可视化:

最大流量可视化
最大流量可视化(剩余)

这就对了。现在, 你已经了解了有关线性分配问题的所有信息。

你可以在GitHub上找到本文中的所有代码。

赞(0)
未经允许不得转载:srcmini » 最大流量和线性分配问题

评论 抢沙发

评论前必须登录!