深入了解Flink Gelly图计算模型之Vertex-Centric

Flink是一款支持实时和离线场景下的大规模数据计算引擎,对标spark graphx,Flink提供了三种通用的基于迭代的图计算模型的实现(Flink-Gelly:Iterative Graph Processing),分别是:Vertex-Centric, Scatter-Gather和Gather-Sum-Apply,接下来将分三篇文章分别来详细介绍每个模型的特点和具体使用方式。

Vertex-Centric模型概述

Vertex-Centric模型也被称之为Pregel,核心思想为:从图中每个顶点的角度表达计算。其计算过程以同步迭代地方式进行,每次迭代过程称之为一个Superstep(超步,也不知道这样翻译对不对),每个Superstep中处于活跃状态(active)的顶点并行地执行同样地UDF。顶点之间通过消息进行通信,在知道目标顶点ID的前提下,一个顶点可以向任何目标顶点发送消息。Superstep之间是同步执行的,下一个Superstep的执行需要依赖前一个Superstep的执行完成,因此上一个Superstep传递的消息会保证在下一个Superstep开始之前传递完毕。消息传递除了能在顶点之间传递消息之外,还能用于判断当前轮次的Superstep下,哪些顶点处于活跃状态,在每个轮次的Superstep中,只有那些接收到消息的顶点才会被认为是处于活跃状态。Vertex-Centric模型的执行过程如下图所示:

Vertex-Centric模型使用

  Vertex-Centric模型原理十分简单,使用起来也不难,只需要我们定义两个部分:每个顶点需要执行的用户自定义函数ComputeFunction和消息在每个迭代轮次中的组合方式MessageCombiner,其中MessageCombiner是可选的。下面我们以一段完整的示例代码来阐述Vertex-Centric模型的使用方法。

package com.quan.graph;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.pregel.ComputeFunction;
import org.apache.flink.graph.pregel.MessageCombiner;
import org.apache.flink.graph.pregel.MessageIterator;

import java.util.LinkedList;
import java.util.List;

public class VC_SSSP {

    //Set 1 as the source.
    public static int srcId = 1;

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List> edgesList = new LinkedList<>();
        edgesList.add(new Edge(1, 2, 12));
        edgesList.add(new Edge(1, 6, 3));
        edgesList.add(new Edge(1, 7, 14));
        edgesList.add(new Edge(2, 6, 7));
        edgesList.add(new Edge(2, 3, 10));
        edgesList.add(new Edge(3, 4, 3));
        edgesList.add(new Edge(3, 5, 5));
        edgesList.add(new Edge(3, 6, 4));
        edgesList.add(new Edge(4, 5, 4));
        edgesList.add(new Edge(5, 6, 2));
        edgesList.add(new Edge(5, 7, 8));
        edgesList.add(new Edge(6, 7, 9));

        DataSet> edges = env.fromCollection(edgesList);

        // Read the input data and create a graph.
        Graph graph = Graph.fromDataSet(edges, new InitVertices(), env);

        // Convert the graph to undirected.
        Graph undirected_graph = graph.getUndirected();

        // Define the maximum number of iterations.
        int maxIterations = 10;

        // Execute the vertex-centric iteration.
        Graph result = undirected_graph.runVertexCentricIteration(
                new SSSPComputeFunction(), new SSSPMessageCombiner(), maxIterations);

        // Extract the vertices as the result.
        DataSet> singleSourceShortestPaths = result.getVertices();

        // Print the result.
        singleSourceShortestPaths.print();
    }

    // User Define Function.
    @SuppressWarnings("serial")
    public static final class SSSPComputeFunction extends ComputeFunction {

        @Override
        public void compute(Vertex vertex, MessageIterator messages) throws Exception {
            Integer minDistance = vertex.getId().equals(srcId) ? 0 : Integer.MAX_VALUE;
            for (Integer msg : messages) {
                minDistance = Math.min(minDistance, msg);
            }

            if (minDistance < vertex.getValue()) {
                setNewVertexValue(minDistance);
                for (Edge e : getEdges()) {
                    sendMessageTo(e.getTarget(), minDistance + e.getValue());
                }
            }
        }
    }

    // Message combiner
    @SuppressWarnings("serial")
    public static final class SSSPMessageCombiner extends MessageCombiner {

        @Override
        public void combineMessages(MessageIterator messageIterator) throws Exception {
            Integer minMessage = Integer.MAX_VALUE;
            for (Integer msg : messageIterator) {
                minMessage = Math.min(minMessage, msg);
            }
            sendCombinedMessage(minMessage);
        }
    }

    @SuppressWarnings("serial")
    private static final class InitVertices implements MapFunction {

        public Integer map(Integer id) {
            return Integer.MAX_VALUE;
        }
    }

这里我们用一个下图所示的无向图为例,初始化的时候将每个节点的值都设置成当前数据类型的最大值,其中顶点ID为1的点作为源点,该图的单源点最短路径(SSP)执行结果如下图所示。在第一个Superstep期间,所有顶点都处于活跃状态,但是根据compute的执行过程,最终仅有源点可以向其邻居传播距离。

在接下来的Superstep骤中,每个顶点检查其接收到的消息并选择出它们之间的最小距离,如果这个距离小于它的当前值,该顶点就会更新它的当前值,并为它的邻居产生消息(当前最小值+到邻居的边的距离)。如果一个顶点在上一步中没有改变它的值,那么该顶点在当前轮次的迭代中不执行任何compute操作,也不向下一个Superstep中的任何顶点发送消息。当所有顶点的状态不再改变或达到最大迭代次数时算法收敛。在该算法中,可以使用MessageCombiner减少发送到目标顶点的消息数量。

Vertex-Centric模型参数配置

  可以使用VertexCentricConfiguration对象配置Vertex-Centric模型。目前可指定的参数有
Name: 可以使用setName()方法 为vertex-centric迭代模型指定一个名称。
Parallelism: 可以使用setParallelism()方法为每个轮次迭代中顶点执行ComputeFunction计算的并行度。
Solution set in unmanaged memory: 可以使用setSolutionSetUnmanagedMemory() 方法来指定结果集是否保存在托管内存中,默认情况下结果集是运行在托管内存中。
Aggregators: 可以使用registerAggregator()方法来为每个迭代注册聚合函数,迭代聚合器在每个超步骤中将所有聚合全局地组合一次,并使它们在下一个超步骤中可用。

Broadcast Variables: 可以使用addBroadcastSet() 方法为 ComputeFunction添加广播变量(Broadcast Variables)。

// configure the iteration
VertexCentricConfiguration parameters = new VertexCentricConfiguration();

// set the iteration name
parameters.setName("Gelly Iteration");

// set the parallelism
parameters.setParallelism(16);

//Defines whether the solution set is kept in managed memory
parameters.setSolutionSetUnmanagedMemory(true);

// register an aggregator
parameters.registerAggregator("sumAggregator", new LongSumAggregator());
展开阅读全文

页面更新:2024-04-12

标签:源点   模型   轮次   顶点   活跃   状态   距离   过程   消息   方法

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top