Azero: 一个大规模动态负载均衡图处理系统 hsa@net.pku.edu.cn
目录 问题提出 空间向量划分算法 利用SVPA进行动态负载均衡 Azero系统架构、API及实现 待完成工作 图处理系统性能制约因素 现有系统存在问题 主要贡献 空间向量划分算法 利用SVPA进行动态负载均衡 Azero系统架构、API及实现 待完成工作
问题背景 大规模图处理 计算模型 BSP vs Mapreduce 图的切分与存储
现有图处理系统 Pregel (SIGMOD 2010) Giraph (Hadoop Submit 2011) Mizan (EuroSys 2013) …
性能制约因素 Worker间网络通信 Worker的负载不均衡 Cross-edge,图切分算法 瓶颈节点 需要动态负载均衡 算法行为 图结构 底层平台 需要动态负载均衡
现有系统存在问题 系统性能受瓶颈Worker制约 图的切分未考虑图顶点的连接关系 负载均衡过程未保持图顶点连接的局部性 负载均衡算法开销大 大量Cross-edge的产生 负载均衡过程未保持图顶点连接的局部性 负载均衡算法开销大 计算复杂度和网络开销 顶点迁移时大量的数据传输
Motivation 寻找一个新的图处理负载均衡解决方案 负载均衡同时保持图顶点连接局部性 简单、高效
主要贡献 SVPA图切分算法 利用SVPA进行动态负载均衡的框架 新的大规模图处理系统Azero 简单高效 负载均衡同时保持图的局部性 顶点低成本迁移、Superstep内负载均衡 新的大规模图处理系统Azero 计算框架,API 分布式索引,缓存策略等
part2 SVPA算法
图的切分 Worker集合W={w1, w2, …},|W|=N 图G=<V, E>,v的后继元集+(v) 切分方案Ptt: VW wi上顶点集合PPI(i)=Ptt-1[{wi}] 跨边集CE={<v1, v2>|<v1, v2>EPtt(v1)Ptt(v2)} 切分均匀性:max({card(PPI(i))})低 切分局部性:card(CE)小
空间向量划分算法SVPA Space Vector Partition Algorithm 产生切分方案的函数SVPA: GRN(VW) N维欧氏空间SN 函数CN: VRN将顶点v映射到SN中 𝐶 𝑁 𝑣 = 𝑐𝑎𝑟𝑑 Γ + 𝑣 ∩PPI 1 𝑐𝑎𝑟𝑑 Γ + 𝑣 , ⋯, 𝑐𝑎𝑟𝑑 Γ + 𝑣 ∩PPI 𝑁 𝑐𝑎𝑟𝑑 Γ + 𝑣 . CN(v)反映v与各worker顶点关联的紧密程度
CN(v) w1 w2
CN(v) w1 w2 w3 𝐶 3 𝑣 = 1 6 , 3 6 , 2 6 𝐶 3 𝑣 = 2 5 , 2 5 , 1 5
预切分方案Proposed Partition 新图的PPI= 利用预切分方案计算CN METIS 预切分方案和CN计算的分布式化 网络开销(|E|/N)
划分向量p pRN, p0 使用p将空间划分为N个部分{Rp,i} 𝑖=1 𝑁 𝐶 𝑁 𝑣 .𝑖 =1. xRp,i, 若px的最大分量维度为i 𝑖=1 𝑁 𝐶 𝑁 𝑣 .𝑖 =1. 图中所有顶点位于一张超平面H上 关键性质: Area 𝐻∩ 𝑅 𝑝, 𝑖 Area 𝐻∩ 𝑅 𝑝, 𝑗 = p.i 𝑝.𝑗 . 简单起见, i=1 𝑁 𝑝.𝑖 =1.
SVPA 函数原型:G<V, E>RN(VW) 定义SVPA(G<V, E>, p) = SVPp 其中 𝑆𝑉𝑃 𝑝 𝑣 = 𝑤 𝑖 , s.t. 𝐶 𝑁 𝑣 ∈ 𝑅 𝑝, 𝑖 .
SVPA w1 划分向量p w2
切分效果 V中顶点在SN中分布大致均匀 因此SVPp划分满足 故SVPA可产生满足均匀性的切分 SVPA产生的切分满足局部性 𝑐𝑎𝑟𝑑 𝑃𝑃𝐼 𝑖 𝑐𝑎𝑟𝑑 𝑃𝑃𝐼 𝑗 ≈ p.i 𝑝.𝑗 . 故SVPA可产生满足均匀性的切分 SVPA产生的切分满足局部性 Rp,i中顶点与PPI(i)关联的紧密程度比~Rp,i高 SVPA切分结果的局部性与METIS相近
part3 利用SVPA进行动态负载均衡
负载均衡方案 六步负载均衡方案 (Superstep结束) Master收集Worker信息得到负载向量L Master计算新的划分向量new_p=LBF(old_p, L) Master将new_p发送给所有索引节点 索引节点计算新的切分方案SVPnew_p 索引节点计算顶点迁移方案M 节点迁移 (下一个Superstep开始)
负载均衡方案 Worker Index Master new ptt 4 3 mig pln 5 1 6 new p 2
负载向量L L = (load_w1, load_w2, …, load_wN) load衡量指标 worker运行总时间 网络通信量 …
负载均衡函数LBF new_p = LBF(old_p, L) 将Worker分为两类:Over-load和Under-load 改变划分向量,使前者的顶点向后者迁移 使得新Superstep中各Worker负载近似相等 基于以下假设 Worker中各顶点产生的负载近似相等 顶点在相邻Superstep产生的负载近似相等
LBF两个版本 LBF_O LBF_U 基于Over-load 针对性处理瓶颈Worker 基于Under-load 可用于Superstep内实时负载均衡* 二者效果差不多 具体实现中采用LBF_u
伪代码 LBF_O(old_p, L) average_load AverageValueOfArrayElements(L) delta_p 0 delta_l 0 for i 1 to N do L[i] L[i] - average_load if L[i] > 0 then new_p[i] old_p[i]*average_load/(average_load+L[i]) delta_p delta_p + old_p[i] – new_p[i] delta_l delta_l + L[i] if delta_l > 0 then delta -delta_p / delta_l if L[i] < 0 then new_p[i] old_p[i] + delta * L[i] return new_p 迁出 迁入
迁移方案M 索引节点根据SVPnew_p和SVPold_p计算方案M M是VWW上的三元关系 M= 𝑣, 𝑤 𝑓𝑟𝑜𝑚 , 𝑤 𝑡𝑜 𝑣∈𝑑𝑜𝑚 𝑆𝑉𝑃 𝑜𝑙𝑑_𝑝 ⊗ 𝑆𝑉𝑃 new_p . 对M中元素执行migrate(v, wfrom, wto)进行迁移
性能分析 顶点索引的空间复杂度(|V|*N) 负载均衡算法 比正常索引(|V|)大 备份简单(更新索引时只需备份p) 计算时间复杂度(N+|V|+|V|/N)= (|V|) 较大 网络通信复杂度(N+N^2)=(N^2) 很小 负载均衡(|V|)计算可通过矩阵运算等方式快速完成
顶点低成本迁移 两步迁移 预复制 迁移/复制 避免Message迁移 输入图时将边缘顶点复制到多个Worker上 顶点同时只有一个副本处于active状态 迁移/复制 SVPA切分方案的一致性 输入图时通过给出若干虚拟的负载,将边缘顶点复制到多个worker上 一致性:对于相同的p,SVPA总是给出相同的切分方案
状态机 1 Vertex Replication Receiving Messages Computable Active To be transfered To be active Sleeping
两步迁移 Step 1 Step 2 Before Migration Worker 1 Worker 2 ACT ACT ACT ACT TBT TBT TBT TBA TBT TBA TBA TBA RP-1 RP-2 SLP SLP SLP SLP Computable Computable Receiving Messages Receiving Messages
Superstep内实时负载均衡 Superstep内转移工作至提前完成的Worker 顶点稳定性函数 𝑆𝑡𝑎𝑏𝑙𝑒 𝑣, 𝑝 = 𝑀𝑎𝑥𝑉𝑎𝑙𝑢𝑒(𝑝⋅ 𝐶 𝑁 𝑣 ) 𝑆𝑒𝑐𝑜𝑛𝑑𝑀𝑎𝑥𝑉𝑎𝑙𝑢𝑒(𝑝⋅ 𝐶 𝑁 𝑣 ) Worker按顶点Stable由大到小的顺序处理 wi提前完成时Manager计算迁往wi的顶点集 实时迁移顶点(未计算/已计算) 将频繁迁移顶点的msg保存在索引节点上 Stable用来衡量当其它worker负载低时,该顶点在实时负载均衡过程中被迁移走的可能性
part4 Azero系统的设计与实现
图处理流程 … Graph Data BSP model Worker Graph Partitioning Output Manager
Barrier Synchronization 计算模型 w1 w2 w3 w4 w5 Local Computing Communacation Barrier Synchronization
v v 顶点计算过程 msgs(v) Superstep k-1 Superstep k msgs(v’) msgs(v”)
API public class Vertex { public long getVID(); public Object getValue(); public void setValue(Object value); public Set<Long> getEdges(); } public interface VertexProcessor { public void compute(WorkerContext context); } public class WorkerContext { public Vertex getCurrentVertex(); public Map<Long, List<String>> getMessages(); public void sendMessage(long vid, String message); public void sendMessageToAllEdges(String message); public long getNumSuperSteps(); public long getNumVertexes(); public void voteToHalt(); }
PageRank Example public class PageRank implements VertexProcessor { public void compute(WorkerContext context) { Vertex v = context.getCurrentVertex(); if (context.getNumSuperSteps() >= 1) { double sum = 0; for (List<String> list : context.getMessages().values()) { sum += Double.valueOf(list.get(0)); } v.setValue(Double.valueOf(0.15/context.getNumVertexes+0.85*sum)); if (context.getNumSuperSteps() < 30) { long edges = v.getEdges.size(); context.sendMessageToAllEdges(String.valueOf(v.getValue()/edges)); } else { voteToHalt();
Azero’s Arichitecture Distributed Hash Worker Manager Azero’s Arichitecture Manager负责计算进程的全局同步、状态收集、划分向量计算等 Worker负载顶点的实际计算与通信、使用SVPA维护分布式索引、进行顶点迁移等
分布式索引 散列函数H: VW 顶点v的索引<v, Ptt(v)>保存在节点H(v)上 三步顶点寻址方案 计算H(v) 向H(v)请求v的索引Ptt(v) 向Ptt(v)发送消息<src, dst=v, msg>
分布式索引 H(v) <v, Ptt(v)> Where is v? Ptt(v) v msg Ptt(v) Worker Distributed Hash Worker v msg Ptt(v) <v, Ptt(v)>
索引Replication 选择两个散列函数H1和H2 将<v, Ptt(v)>同时保存在H1(v)和H2(v)上 ∀𝑣∈𝑉, 𝐻 1 𝑣 ≠ 𝐻 2 𝑣 . 将<v, Ptt(v)>同时保存在H1(v)和H2(v)上 节点H1(v)失效时向H2(v)查询
索引Cache 将<v, Ptt(v)>保存在Ptt(v)上 将<v, Ptt(v)>缓存在wi上 当PPI(i)对v的连续k次寻址结果相同时 k=3
流程图 寻址 顶点 找到? 查询本地索引 寻址失败 查询本地缓存 命中? 查询H1(v) 查询H2(v) 更新本地缓存 可缓存? 寻址成功 Y N
Azero Worker Storage Layer Index Vertex Addressor Link Layer Message Manager Vertex Addressor Link Layer Message Receiver Sender API / Context Controller User Function Network Local Disk Azero Worker SVPA
实现细节 Message发送方式 网络连接方式 … 计算时并行发送:节省时间 计算后统一发送:Combiner优化 Netty:性能较好 RMI:编程简洁 …
待完成工作 测量 顶点坐标分布与负载均衡灵活性 切图结果均匀性与局部性 动态负载均衡的实际效果与比较 索引缓存策略的实际效果与比较 系统可扩展性
Q&A Thanks