School of EECS, Peking University Fault Tolerance http://net.pku.edu.cn/~course/cs501/2012 Hongfei Yan School of EECS, Peking University 5/16/2012
"Failure is not an option. It comes bundled with your software "Failure is not an option. It comes bundled with your software.“ (--unknown) "You know you have [a distributed system] when the crash of a computer you've never heard of stops you from getting any work done.“ (--Leslie Lamport) http://en.wikipedia.org/wiki/Leslie_Lamport Lamport’s research contributions have laid the foundations of the theory of distributed systems. Among his most notable papers are “Time, Clocks, and the Ordering of Events in a Distributed System”,[4] which received the PODC Influential Paper Award in 2000,[5] “The Byzantine Generals Problem”,[6] “Distributed Snapshots: Determining Global States of a Distributed System”[7] and “The Part-Time Parliament”.[8] These papers relate to such concepts as logical clocks (and the happened-before relationship) and Byzantine failures. They are among the most cited papers in the field of computer science[9] and describe algorithms to solve many fundamental problems in distributed systems, including: the Paxos algorithm for consensus, the bakery algorithm for mutual exclusion of multiple threads in a computer system that require the same resources at the same time and the snapshot algorithm for the determination of consistent global states.
Some real-world datapoints Drive manufacturers specify the reliability of their products in terms of two related metrics: the annualized failure rate (AFR), which is the percentage of disk drives in a population that fail in a test scaled to a per year estimation; and the mean time to failure (MTTF). Sources: Disk Failures in the Real World: What Does an MTTF of 1,000,000 Hours Mean to You?, Bianca Schroeder and Garth A. Gibson (FAST 07) [pdf] Failure Trends in a Large Disk Drive Population, Eduardo Pinheiro, Wolf-Dietrich Weber, and Luiz André Barroso (FAST’07) [pdf]
Contents 01: Introduction 02: Architectures 03: Processes 04: Communication 05: Naming 06: Synchronization 07: Consistency & Replication 08: Fault Tolerance 09: Security 10: Distributed Object-Based Systems 11: Distributed File Systems 12: Distributed Web-Based Systems 13: Distributed Coordination-Based Systems
Outline Basic concepts Process resilience Reliable client-server communication (++) Reliable group communication Distributed commit (++) Recovery
Fault handling approaches Fault prevention: prevent the occurrence of a fault Fault tolerance: build a component in such a way that it can meet its specifications in the presence of faults (i.e., mask the presence of faults) Fault removal: reduce the presence, number, seriousness of faults Fault forecasting: estimate the present number, future incidence, and the consequences of faults 建立一个可靠的系统与控制故障紧密相关。防止、解决和预报故障三者之间是有差别的。对我们来说最重要的问题是容错,它意味着系统即使在发生故障时也能提供服务。
Design Goal (with regard to fault tolerance): Design a (distributed) system that can recover from partial failures without affecting correctness or significantly impacting overall performance
分布式系统设计出发点 一个进程P可能依赖不同计算机上其他进程提供的服务,如果那些进程由于出现错误或故障而失去联系,则P无法正常运行。 计算机死机,或许网络断开,或许对方负载太重,暂时无法提供所需的服务 如果选取一组计算机联合执行同一个任务,当个别或少数计算机出现错误和故障时,大多数计算机仍然能够正常地完成任务 E.g., 分布式复制系统 第一个出发点告诉我们,一个进程所依赖的服务可能会以各种各样、相互独立的错误而宣告失败。第二个出发点告诉我们,利用多进程/多计算机的优点,可能会淹去潜在的错误或故障。这两点看上去既相互关联又相互矛盾,设计者的责任就是从关联和矛盾中找到答案。 Basics: A component provides services to clients. To provide services, the component may require the services from other components ⇒ a component may depend on other components Specifically: A component C depends on C* if the correctness of C's behavior depends on the correctness of C‘* behavior. Note: For distributed systems, components can be either processes or channels Note: For distributed systems, components can be either processes or channels
Dependability Being fault tolerant is strongly related to what are called dependable systems. Some properties of dependability: Availability Readiness for usage Reliability Continuity of service delivery Safety Low probability of catastrophes Maintainability How easy can a failed system be repaired Availability,说明系统已准备好,马上就可以使用。通常,它指在任何给定的时刻,系统都可以正确地操作,可根据用户的行为来执行它的功能。换句话说,高度可用的系统在任何给定的时刻都能及时地工作。 Reliability是指系统可以无故障地持续运行。与可用性相反,可靠性是根据时间间隔而不是任何时刻来进行定义的。 Safety是指系统偶然出故障的情况下能正确操作而不会造成任何灾难。 Maintainability是指发生故障的系统被恢复的难易程度。
Terminology Failure: When a component is not living up to its specifications, a failure occurs Error: The part of a component's state that can lead to a failure Fault: The cause of an error. Fault types generally (in terms of their properties): Transient: occur once and then disappear Intermittent: occur, then vanish, then reappear Permanent: continues to exist 如果一个系统无法满足对用户的承诺,这个系统被称为失灵(fail)。 一个系统在正常工作时会在若干种运行状态之间变迁,一旦出现异常,则该系统进入错误(error)状态。一个系统的错误状态可能是导致系统失灵的原因。这里使用了一个模糊的术语“可能”,因为有的错误状态并不一定导致系统失灵,如系统死锁是一种错误状态,当死锁被排除后,系统又可以恢复正常运行。 造成错误的原因是故障(fault),故障的种类繁多,软件和硬件都可能产生故障,而且,某些场合下,与系统无关的外部环境也可能引发故障。
Failure Models (in terms of their specifications) Crash failures: A component simply halts, but behaves correctly before halting Omission failures: A component fails to respond to incoming requests Receive omission: Fails to receive incoming messages Send omission: Fails to send messages Timing failures: The output of a component is correct, but lies outside a specified real-time interval E.g., performance failures: too slow Response failures: A component’s respond is incorrect Value failure: The wrong value is produced State transition failure: Execution of the component's service brings it into a wrong state Arbitrary (byzantine) failures: A component may produce arbitrary output and be subject to arbitrary timing failures Note: Crash failures are the least severe; arbitrary failures are the worst To get a better grasp on how serious a failure actually is, several classification schemes have been developed. One such scheme is based on schemes described in (Cristian, 1991; and Hadzilacos and Toueg, 1993) Cristian 分类法假定仅当一台服务器的状态和返回给客户的结果都正确无误时,该客户请求的服务才算正确完成。 定义故障语义(failure semantics)的重要性在于防止或避免继承性故障。根据这个假定,给出Cristian的服务故障分类(Failure Models)。 RPC的至少一次语义的重发技术可能造成远程过程的多次调用,如果服务器一端没有采取适当的措施,就会出现应答故障(状态变迁故障)
What to do about failures to obtain Fault Tolerance? Main approach: mask failures using redundancy Information redundancy E.g., a Hamming code can be added to transmitted data to recover from noise on the transmission line. Time redundancy is especially helpful for transient or intermittent faults. E.g., using transactions Physical redundancy E.g., 747s have four engines but can fly on three
Outline Basic concepts Process resilience Reliable client-server communication (++) Reliable group communication Distributed commit (++) Recovery
Process Resilience Basic issue: Protect against faulty processes Solution: Process Groups Replicate and distribute computations in a group. provide abstraction for a collection of processes “identical” processes all members receive all messages sent to the group The purpose of introducing groups is to allow processes to deal with collections of processes as a single abstraction. Thus a process can send a message to a group of servers without having to know how many there are or where they are, which may change from one call to the next. 编组的目的是把若干等同的进程抽象成一个具备容错能力的虚进程。因此,组内成员要具备彼此通信的能力。然而,由于一组等同进程分布在不同服务器上,我们必须假定组内任一个进程都可以把信件送达该组的所有服务器上,而无需知道究竟有多少服务器,这些服务器位于何处,以及这些服务器处于何种状态等。
Process Groups Flat groups: Good for fault tolerance as information exchange immediately occurs with all group members; however, may impose more overhead as control is completely distributed (hard to implement). Hierarchical groups: All communication through a single coordinator ⇒ not really fault tolerant and scalable, but relatively easy to implement. Q: What consistency protocols fit best for each approach? Issue: group membership
Groups and Failure Masking (1/4) Terminology: a k-fault tolerant group can mask any k concurrent member failures (k is called degree of fault tolerance). Problem: how large does a k-fault tolerant group need to be? Assume crash/performance failure semantics ⇒ a total of k + 1 members are needed to survive k member failures. Assume arbitrary failure semantics and group output defined by voting collected by the client ⇒ a total of 2k+1 members are needed to survive k member failures. Assumption: all members are identical, and process all input in the same order ⇒ only then are we sure that they do exactly the same thing. Problem: how large does a k-fault tolerant group need to be? From a client perspective: Assume crash/performance failure semantics ⇒ a total of k + 1 members are needed to survive k member failures. Assume arbitrary failure semantics and group output defined by voting collected by the client ⇒ a total of 2k+1 members are needed to survive k member failures Assumption: all members are identical, and process all input in the same order (atomic multicast problem) From a process group perspective (reaching agreement) the problem is more complex 可以参看”分布式系统, by 李西宁, 科学出版社, 2006. ”的216~217页 当客户面临多个执行结果时,总可以使用选举的方法来筛选出大多数一致的结果。然而,在分布式应用中,还可能面临另外一种情况,即正确结果的选择不是由客户提出,而是由编组内的进程来达成一致的协定(agreement)。例如,编组内选举协调者,决定是否应该提交一件事务等都要求组内进程通过某种通信协议来达成协定。
Groups and Failure Masking (2/4) Assumption: Group members are not identical, i.e., we have a distributed computation Problem: Non-faulty group members should reach agreement on the same value Observation: Assuming arbitrary failure semantics, we need 3k + 1 group members to survive the attacks of k faulty members Note: This is also known as Byzantine failures. Essence: We are trying to reach a majority vote among the group of loyalists, in the presence of k traitors ⇒ need 2k + 1 loyalists. Examples: Leader election, commit, etc
Groups and Failure Masking (3/4) The Byzantine generals problem for 3 loyal generals and 1 traitor. what they send to each other what each one got from the other what each one got in second step The problem was originally studied by Lamport et al. (1982) and is also known as the Byzantine agreement problem, referring to the numerous wars in which several armies needed to reach agreement on, for example, troop strengths while being faced with traitorous generals, conniving lieutenants, and so on. Consider the following solution, described in Lamport et al. (1982). In this case, we assume that processes are synchronous, messages are unicast while preserving ordering, and communication delay is bounded. We assume that there are N processes, where each process i will provide a value vi to the others. The goal is let each process construct a vector V of length N, such that if process i is nonfaulty, V [i ] = vi . Otherwise, V [i ] is undefined. We assume that there are at most k faulty processes. The Byzantine generals problem for 3 loyal generals and 1 traitor. a) The generals announce their troop strengths (in units of 1 kilosoldiers). b) The vectors that each general assembles based on (a) c) The vectors that each general receives in step 3.
Groups and Failure Masking (4/4) Issue: What are the necessary conditions for reaching agreement? Process: Synchronous ⇒ operate in lockstep Delays: Are delays on communication bounded? Ordering: Are messages delivered in the (real time) order they were sent? Transmission: Are messages sent one-by-one, or multicast? http://net.pku.edu.cn/~course/cs501/2008/resource/steen_vrije/books/ds2/fixes/printrun_1/332.pdf http://net.pku.edu.cn/~course/cs501/2008/resource/steen_vrije/books/ds2/fixes/printrun_1/333.pdf As it turns out, reaching agreement is only possible for the situations shown in Fig. 8-4. In all other cases, it can be shown that no solution exists. Note that most distributed systems in practice assume that processes behave asynchronously, message transmission is unicast, and communication delays are unbounded. As a consequence, we need to make use of ordered (reliable) message delivery, such as provided as by TCP. Fig. 8-4 illustrates the nontrivial nature of distributed agreement when processes may fail.
Summary so far Use replication to provide fault tolerance When process groups are used, reaching agreement is a key requirement. Main results: Two army-problem Impossible to design a protocol that guarantees that reach agreement is always reached with unreliable unicast communication Byzantine generals problem: In a system with k faulty processes, agreement can be achieved only if 2k+1 correctly functioning processes are present. (Lamport, 1982) If messages cannot be guaranteed to be delivered within a known, finite time, no agreement is possible even with one faulty process. (Fischer, 1985) 在不可靠传输的条件下,即使是无错误的进程,在两个进程之间达成协议也是不可能的。 假定通信良好,但是进程却并不好。一个经典的军事问题称为Byzantine generals problem. 假定一个分布式系统无法保证在有限时间内可靠地提交信件,哪怕只有一台服务器出现故障,我们都无法达成一致性协定。
Failure Detection Essence: Detect failures through timeout mechanisms Setting timeouts properly is difficult and application dependent You cannot distinguish process failures from network failures Need to consider failure notification throughout the system: Gossiping (i.e., proactively disseminate a failure detection) Disseminate [di‘semineit] v.散布 On failure detection algorithms in overlay networks Zhuang, S.Q. Geels, D. Stoica, I. Katz, R.H. This paper appears in: INFOCOM 2005. 24th Annual Joint Conference of the IEEE Computer and Communications Societies. Proceedings IEEE Publication Date: 13-17 March 2005 Volume: 3, On page(s): 2112- 2123 vol. 3 Active approach, a node periodically sends keep-alive messages Passive approach #Gossiping (i.e., proactively disseminate a failure detection) #On failure detection, pretend you failed as well http://www.bloglines.com/preview?siteid=16370201&itemid=23 In distributed computing, the widely adopted technique of failure detection is timeouts. If we do not receive a response or a heartbeat signal from a process, we assume that it has failed. Recently, I read an interesting paper by Werner Vogels titled "World Wide Failures". He provides an interesting analogy. "We do not assume that some person is dead just because he does not answer the phone, we will do some background investigation like ask his landlord & neighbors, check whether the electricity & water is being consumed and so on." The point is, timeouts at the transport layer are not always sufficient for failure detection. We can consult other data sources such as the Operating System where the process runs, use information about status of network routing nodes or can augment with application-specific solutions. In other words, some sort of external failure detection mechanism would generally provide better results. The interesting thing is, with timeouts, we cannot detect Byzantine failures in processes. However, with an external failure detector (Failure Investigator), we may be able to detect processes that have been misbehaving for sometime, but still are connected at the transport level.
Reliable client-server communication (++) Reliable group communication Outline Basic concepts Process resilience Reliable client-server communication (++) Reliable group communication Distributed commit (++) Recovery In many cases, fault tolerance in distributed systems concentrates on faulty processes. However, we also need to consider communication failures.
Reliable Communication So far: Concentrated on process resilience (by means of process groups). What about reliable communication channels? Error detection: Framing of packets to allow for bit error detection Use of frame numbering to detect packet loss Error correction: Add so much redundancy that corrupted packets can be automatically corrected Request retransmission of lost, or last N packets Observation: Most of this work assumes point-to-point communication
Reliable RPC (1/3) What can go wrong?: Client cannot locate server Client request is lost Server crashes Server response is lost Client crashes [1:] Relatively simple - just report back to client [2:] Just resend message (and use messageID to uniquely identify messages) 来看一下在使用RPC这样的高级通信工具时的客户-服务器通信。
Reliable RPC (2/3) [3] Server crashes are harder as you don't know what the server has already done: Problem: we need to decide on what to expect from the server At-least-once-semantics: The server guarantees it will carry out an operation at least once, no matter what. At-most-once-semantics: The server guarantees it will carry out an operation at most once, but possibly none at all. Figure 7-6. A server in client-server communication. (a) The normal case. (b) Crash after execution. (c) Crash beore execution. 比较麻烦的部分在于正确对待(b)和(c)之间的不同。
Reliable RPC (3/3) [4:] Lost replies ⇒ Detection hard: because it can also be that the server had crashed. You don't know whether the server has carried out the operation Solution: None, except that you can try to make your operations idempotent: repeatable without any harm done if it happened to be carried out before. [5:] Client crashes ⇒ The server is doing work and holding resources for nothing (called doing an orphan computation). Orphan is killed by client when it reboots Broadcast new epoch number when recovering ⇒ servers kill orphans Require computations to complete in a T time units. Old ones are simply removed. 把客户一端的时间编排成一系列纪元(epoch)。每当客户重新启动计算机,便向所有相关的服务器发送一封信件,通知该客户开始了一个新纪元。于是,服务器一端就可以根据纪元的顺序号,删除那些属于过时纪元的RPC计算。
Reliable client-server communication (++) Reliable group communication Outline Basic concepts Process resilience Reliable client-server communication (++) Reliable group communication Also called reliable multicasting Distributed commit (++) Recovery 组播通信、广播通信是许多分布式同步算法的基本通信手段,在采用冗余技术的容错系统中,可靠的组播通信更是系统赖以成功的关键。实际上,讨论的进程/服务器编组结构也正是基于组播通信这一概念,组内任一成员发出的信件都必须可靠地送达组内所有其他成员。一般来说,发出组播信件的进程没有必要提供组内所有成员的地址,只需要提供编组标示符。而实现组播通信的软件(中间件)把编组标示符影射成一张成员地址表,把信件送到表中的所有地址。这样做的目的是提高编组的透明性,把编组的内部结构和通信的具体实现隐蔽起来。
Reliable Multicasting (1/2) Model: a multicast channel c with two (possibly overlapping) groups: The sender group SND(c) of processes that submit messages to channel c The receiver group RCV(c) of processes that receive messages from channel c Possible reliability requirements: Simple reliability: No messages lost If process P ∊ RCV(c) at the time message m was submitted to c, and P does not leave RCV(c), m should be delivered to P Atomic multicast: All active processes receive the same thing Ensure that a message m submitted to channel c is delivered to process P ∊ RCV(c) only if m is delivered to all members of RCV(c) 如果假定存在一个协议说明谁是组的成员,那么问题就会简单一些。特别是,如果我们假定进程都不会失败,而且在通信进行期间不会有进程加入或离开组,那么可靠多播就简单地意味着每个消息都应该被传递到组的每个当前成员处。在最简单的情况下,不要求所有的组成员都按同样的顺序接收消息,但是有时需要这个特性。 It is generally also required that all messages are delivered in the same order to all processes. This is also known as the atomic multicast problem.
Reliable Multicasting (2/2) Observation: If we can stick to a local-area network, reliable multicasting is ‘easy’ Principle: Let the sender log messages submitted to channel c: If P sends message m, m is stored in a history buffer Each receiver acknowledges the receipt of m, or requests retransmission at P when noticing message lost Sender P removes m from history buffer when everyone has acknowledged receipt Question: Why doesn't this scale? If there are N receivers, the sender must be prepared to accept at least N acknowledgements. With many receivers, the sender may be swamped with such feedback messages, which is also referred to as a feedback implosion. In addition, we may also need to take into account that the receivers are spread across a wide-area network.
Scalable Reliable Multicasting: Feedback Suppression Basic idea: Let a process P suppress its own feedback when it notices another process Q is already asking for a retransmission Assumptions: All receivers listen to a common feedback channel to which feedback messages are submitted Process P schedules its own feedback message randomly, and suppresses it when observing another feedback message Question: Why is the random schedule so important? If we assume that retransmissions are always multicast to the entire group, it is sufficient that only a single request for retransmission reaches S. For this reason, a receiver R sent until some random time has elapsed. If, in the meantime, another request for retransmission for m reaches R, R will suppress its own feedback, knowing that m will be retransmitted shortly. In this way, ideally, only a single feedback message will reach S, which in turn subsequently retransmits m.
Scalable Reliable Multicasting: Hierarchical Solutions Basic solution: Construct a hierarchical feedback channel in which all submitted messages are sent only to the root. Intermediate nodes aggregate feedback messages before passing them on. Question: What's the main problem with this solution? Observation: Intermediate nodes can easily be used for retransmission purposes The main problem with hierarchical solution is the construction of the tree. In many cases, a tree needs to be constructed dynamicaly. 为了简化,我们假定只有一个发送者需要向一个非常大的接受组进行多播。接受组分为很多子组,组织成树的形式。包含发送者的子组构成了树的根。在每个子组中,任何一种可以为小的组工作的可靠多播方案都可以使用。每个子组都指定一个本地协调者,它负责处理子组中包含的接受者的重发请求。
Summary so far Reliable communication Reliable multicast: Add more redundant data for error detection and correction Retransmissions Reliable multicast: Simple reliability: No messages lost Virtually synchronous multicast Problem: What does reliable delivery means in the presence of process failures? Answer: multicasing is considered to be reliable when it can be guaranteed that all nonfaulty group members receive the message. 发生m丢失的可能情况只有一种,即组播发送者在编组图变化期间出现崩溃故障。这正是我们要引入的另一类具有不同可靠程度的组播协议。这种协议容许发送者出现故障,但发送者在故障前组播的信件必须满足all-or-nothing提交语义。Birman等人首先提出这种协议,并把满足上述性质的组播通信称为虚同步(virtually synchrony)。确切地说,一个实现可靠组播通信的算法是虚同步算法,必须满足下面两个条件: 令Gt = { Pi: 当前编组成员} 为时间t的编组图,则Gt中所有成员Pi都必须有一致的观点,即Pi所有都同意Gt。 2)如果一封组播信件m在Gt变化之前发出,则要么所有Pi没有接收到m,要么所有Pi在变化之前都接收到m。
Atomic Multicast Idea: Formulate reliable multicasting in the presence of process failures in terms of process groups and changes to group membership: Guarantee: A message is delivered only to the non-faulty members of the current group. All members should agree on the current group membership. 进程P3崩溃了。但是在崩溃之前它成功地将消息多播到了进程P2和P4,但是没有多播到P1。但是虚拟同步保证这个消息根本不会被传送,这就有效地建立起一种情况,使得在P3崩溃之前消息从来没有被发送。
Virtual Synchrony (1/2) Essence: Consider views V = RCV(c) U SND(c) Processes are added or deleted from a view V through view changes to V∗; a view change is to be executed locally by each P ∈ V ∩ V∗ (1) For each consistent state, there is a unique view on which all its members agree. Note: implies that all non-faulty processes see all view changes in the same order (2) If message m is sent to V before a view change vc to V*, then either all P ∊ V that execute vc receive m, or no processes P ∊ V that execute vc receive m. Note: all non-faulty members in the same view get to see the same set of multicast messages. (3) A message sent to view V can be delivered only to processes in V, and is discarded by the following views A reliable multicast algorithm satisfying (1)–(3) is virtually synchronous
Virtual Synchrony (2/2) A sender to a view V need not be member of V If a sender S ∊ V crashes, its multicast message m is flushed before S is removed from V. m will never be delivered after the point that V changes Note: Messages from S may still be delivered to all, or none (nonfaulty) processes in V before they all agree on a new view to which S does not belong If a receiver P fails, a message m may be lost but can be recovered as we know exactly what has been received in V. Alternatively, we may decide to deliver m to members in V– {P } Observation: Virtually synchronous behavior can be seen independent from the ordering of message delivery. The only issue is that messages are delivered to an agreed upon group of receivers. 虚拟同步的原理是所有的多播都在视图改变之间进行。换句话说,视图改变作为一个屏障,不能跨越它进行多播。
Message Ordering (1/2) Observation: Virtually synchronous behavior is independent from the ordering of message delivery. The only issue is that messages are delivered to an agreed upon group of receivers. Process P1 Process P2 Process P3 Process P4 sends m1 receives m1 receives m3 sends m3 sends m2 sends m4 receives m2 receives m4 Four processes in the same group with two different senders, and a possible delivery order of messages under FIFO-ordered multicasting
Message ordering (2/2) Six different versions of virtually synchronous reliable multicasting. Multicast Basic Message Ordering Total-ordered Delivery? Reliable multicast None No FIFO multicast FIFO-ordered delivery Causal multicast Causal-ordered delivery Atomic multicast Yes FIFO atomic multicast Causal atomic multicast
Implementing Virtual Synchrony (1/3) Assumptions: Point-to-point communication in the underlying network: Reliable, in-order delivery (TCP-like semantics) Multicast implemented as a sequence of point-to-point transmissions But sender can fail before sending to all receivers Requirements All messages send while in view Gi are delivered to all non-faulty processes in in Gi before the next group membership change Gi+1 might be installed before m is delivered 考虑虚拟同步可靠多播的实现。实现这种多播的一个例子是Isis,一个已经实际使用了几年的容错分布式系统。首先来讨论一下在(Birman等1991)中说明的有关它的实现的一些问题。 Isis中的可靠多播使用了底层网络中可用的可靠的点到点通信工具,特别是TCP。把消息m发送给一个进程组是通过把m可靠地发送给每个组成员来实现的。尽管可以保证每个传输都成功,但还是不能保证所有的组成员都接收到m。特别是,发送者可能在把m传输给每个成员之前崩溃了。 除了使用可靠的点到点通信以为,Isis还假定来自同一个源的消息在通信层按照发送它们的顺序接收。在实践中,可以使用点到点通信的TCP连接来解决这个问题。 Qs How to detect a process is missing a message How to detect & propagate view changes
Implementing Virtual Synchrony (2/3) Solution clue: Every process in Gi keeps m until it knows for sure that all other members in Gi have received m Terminology: m is stable if received by all processes in Gi Only stable messages are delivered
Implementing Virtual Synchrony (3/3) Algorithm sketch P detects a view change Forwards any unstable message in Gi to all processes in Gi P sends a ‘flush’ message P collects a ‘flush response’ from everyone P installs new view Q (another process) When receiving m in the view Gi it believes, it delivers m (after ordering) When receiving a flush message Multicasts all its unstable messages Sends the ‘flush response’ Installs new view Control messages so that each process knows what are the messages received by everyone else.
Implementing Virtual Synchrony – Example Process 4 notices that process 7 has crashed, sends a view change Process 6 sends out all its unstable messages, followed by a flush message Process 6 installs the new view when it has received a flush message from everyone else
Virtual Synchrony Implementation (1/3) The current view is known at each P by means of a delivery list dest[P] If P ∈ dest[Q] then Q ∈ dest[P] Messages received by P are queued in queue[P] If P fails, the group view must change, but not before all messages from P have been flushed Each P attaches a (stepwise increasing) timestamp with each message it sends Assume FIFO-ordered delivery; the highest numbered message from Q that has been received by P is recorded in rcvd[P][Q] The vector rcvd[P][] is sent (as a control message) to all members in dest[P] Each P records rcvd[Q][] in remote[P][Q]
Virtual Synchrony Implementation (2/3) Observation: remote[P][Q] shows what P knows about message arrival at Q 1 2 3 1 5 2 2 2 2 4 3 3 1 4 5 4 4 2 2 4 __________ min 2 1 1 4 A message is stable if it has been received by all Q ∈ dest[P] (shown as the min vector) Stable messages can be delivered to the next layer (which may deal with ordering). Note: Causal message delivery comes for free As soon as all messages from the faulty process have been flushed, that process can be removed from the (local) views
Virtual Synchrony Implementation (3/3) Remains: What if a sender P failed and not all its messages made it to the nonfaulty members of the current view? Solution: Select a coordinator which has all (unstable) messages from P, and forward those to the other group members. Note: Member failure is assumed to be detected and subsequently multicast to the current view as a view change. That view change will not be carried out before all messages in the current view have been delivered.
Distributed commit (++) Outline Basic concepts Process resilience Reliable client-server communication (++) Reliable group communication Distributed commit (++) Recovery =
Distributed Commit • Two-phase commit • Three-phase commit Essential issue: Given a computation distributed across a process group, how can we ensure that either all processes commit to the final result, or none of them do (atomicity)? 前面讨论的原子多播问题是一个称为分布式提交的更一般化问题中的一个例子。
Two-Phase Commit Model: The client who initiated the computation acts as coordinator; processes required to commit are the participants Phase 1a: Coordinator sends vote-request to participants (also called a pre-write) Phase 1b: When participant receives vote-request it returns either vote-commit or vote-abort to coordinator. If it sends vote-abort, it aborts its local computation Phase 2a: Coordinator collects all votes; if all are vote-commit, it sends global-commit to all participants, otherwise it sends global-abort Phase 2b: Each participant waits for global-commit or global-abort and handles accordingly. The first phase is the voting phase, and consists of steps 1 and 2. The second phase is the decision phase, and consists of steps 3 and 4.
Where does the waiting/blocking occur? Two-Phase Commit FSMs Coordinator Participant Where does the waiting/blocking occur? Coordinator-WAIT Participant-INIT Participant-READY
Two-Phase Commit Recovery (1/2) Coordinator Participant Wait State Wait States What happens in case of a crash? How do we detect a crash? If timeout in Coordinator-WAIT, then abort. If timout in Participant-INIT, then abort. If timout in Participant-READY, then need to find out if globally committed or aborted. Just wait for Coordinator to recover. Check with others. Must use timeouts to detect failures.
Two-Phase Commit Recovery (2/2) Coordinator Wait State Participant Wait States If in Participant-READY, and we wish to check with others: If Q is in COMMIT, then commit. If Q is in ABORT, then ABORT. If Q in INIT, then can safely ABORT. If all in READY, nothing can be done. If Q in INIT,当协调者已经向所有参与者发送了VOTE_REQUEST消息,但是这个消息只到达P(然后它用VOTE_COMMIT消息作为应答),而没有到达Q时就是这种情况。换句话说,协调者在多播VOTE_REQUEST时崩溃了。在这种情况下,中止事务是安全的,P和Q都可以把状态转换为ABORT。
Three-Phase Commit
Three-Phase Commit (1/2) The states of the coordinator and each participant satisfy the following two conditions: There is no single state from which it is possible to make a transition directly to either a COMMIT or an ABORT state. There is no state in which it is not possible to make a final decision, and from which a transition to a COMMIT state can be made.
Three-Phase Commit (2/2) (a) The finite state machine for the coordinator in 3PC. (b) The finite state machine for a participant.
Distributed commit (++) Outline Basic concepts Process resilience Reliable client-server communication (++) Reliable group communication Distributed commit (++) Recovery =
Recovery Introduction Checkpointing Message logging
Recovery: Background Essence: When a failure occurs, we need to bring the system into an error-free state: Forward error recovery: Find a new state from which the system can continue operation Backward error recovery: Bring the system back into a previous error-free state Practice: Use backward error recovery, requiring that we establish recovery points Observation: Recovery in distributed systems is complicated by the fact that processes need to cooperate in identifying a consistent state from where to recover Examples of backward recovery? Forward recovery? File system backups Erasure codes,这种方法是从其他成功传送的分组中建立丢失的分组。
Consistent Checkpoints Requirement: Every message that has been received is also shown to have been sent in the state of the sender Recovery line: Assuming processes regularly checkpoint their state, the most recent consistent global checkpoint. Observation: If and only if the system provides reliable communication, should sent messages also be received in a consistent state C1-1 C2-2 C1-3 C1-2 C1-4 C2-1 C2-3 C1-5 M1 M2 R2 R1 R3 Is C1-3 and C2-2 (R1) consistent? Yes. Is C1-4 and C2-3 (R2) consistent? No. Is C1-5 and C2-3 (R3) consistent? Maybe.
Cascaded Rollback Observation: If checkpointing is done at the “wrong” instants, the recovery line may lie at system startup time ⇒ cascaded rollback Known as the “domino effect”. C1-1 C1-2 C1-3 C2-1 C2-2 C2-3
Checkpointing: Stable Storage Principle: Replicate all data on at least two disks, and keep one copy “correct” at all times. After a crash: If both disks are identical: you’re in good shape. If one is bad, but the other is okay (checksums): choose the good one. If both seem okay, but are different: choose the main disk. If both aren’t good: you’re not in a good shape. 要恢复到先前的状态,就有必要安全地存储恢复所需的信息。在这种情况下,安全性意味着恢复信息不仅要经得住进程崩溃与站点故障的考验,而且还要能经得住各种存储介质故障的考验。 稳定存储可以使用一对普通的磁盘来实现,如图所示。驱动器2中的每个块都是驱动器1中对应块的准确拷贝。当更新一个块时,首先对驱动器1中的块进行更新及验证,然后对驱动器2上的相同块进行相同的工作。
Independent Checkpointing Essence: Each process independently takes checkpoints, with the risk of a cascaded rollback to system startup. Let CP[i](m) denote m-th checkpoint of process Pi and INT[i](m) the interval between CP[i](m − 1) and CP[i](m) When process Pi sends a message in interval INT[i](m), it piggybacks (i,m) When process Pj receives a message in interval INT[j](n), it records the dependency INT[i](m)→INT[j](n) The dependency INT[i](m)→INT [j](n) is saved to stable storage when taking checkpoint CP[j](n) Observation: If process Pi rolls back to CP[i](m−1), Pj must roll back to CP[j](n−1). 实现独立的检查点需要记录依赖关系 计算恢复线需要对每个进程在设置检查点时记录的时间间隔依赖关系进行分析。Without going into further details, it turns out that such calculations are fairly complex and do not justify the need for independent checkpoint in comparison to coordinated checkpointing.
Coordinated Checkpointing There are distributed snapshot techniques that can help, but complex. An alternative is to use a global coordinator. Multicast a CHECKPOINT_REQUEST message. Upon receipt, take a local checkpoint, block any new messages the application gives, and sends an ACK. When coordinator gets an ACK from all processes, it sends back CHECKPOINT_DONE. P1 ACK ACK C CR CD CR CD How do we get the global coordinator? Is the second case possible? Coordinated Checkpointing Essence: Each process takes a checkpoint after a globally coordinated action Question: What advantages are there to coordinated checkpointing? Simple solution: Use a two-phase blocking protocol: • A coordinator multicasts a checkpoint request message • When a participant receives such a message, it takes a checkpoint, stops sending (application) messages, and reports back that it has taken a checkpoint • When all checkpoints have been confirmed at the coordinator, the latter broadcasts a checkpoint done message to allow all processes to continue Observation: It is possible to consider only those processes that depend on the recovery of the coordinator, and ignore the rest M1 ACK ACK P2
Message Logging Alternative: Instead of taking an (expensive) checkpoint, try to replay your (communication) behavior from the most recent checkpoint⇒store messages in a log Assumption: We assume a piecewise deterministic execution model: The execution of each process can be considered as a sequence of state intervals Each state interval starts with a nondeterministic event (e.g., message receipt) Execution in a state interval is deterministic Conclusion: If we record nondeterministic events (to replay them later), we obtain a deterministic execution model that will allow us to do a complete replay Question: Why is logging onlymessages not enough? Question: Is logging only nondeterministic events enough? Piecewise deterministic model(分段确定模式)
Message Logging and Consistency Problem: When should we actually log messages? Issue: Avoid orphans: Process Q has just received and subsequently delivered messages m1 and m2 Assume that m2 is never logged. After delivering m1 and m2, Q sends message m3 to process R Process R receives and subsequently delivers m3 Goal: Devise message logging schemes in which orphans do not occur The situation after the recovery of Q is inconsistent with that before its recovery. In particular, R holds a message (m3) that had been sent before the crash, but whose receipt and delivery do not take place when replaying what had happened before the crash. Such inconsistencies should obviously be avoided.
Message-Logging Schemes (1/2) HDR[m] : The header of message m containing its source, destination, sequence number, and delivery number The header contains all information for resending a message and delivering it in the correct order (assume data is reproduced by the application) A message m is stable if HDR[m] cannot be lost (e.g., because it has been written to stable storage) DEP[m]: The set of processes to which message m has been delivered, as well as any message that causally depends on delivery of m COPY[m]: The set of processes that have a copy of HDR[m] in their volatile memory If C is a collection of crashed processes, then Q C is an orphan if there is a message m such that Q ∈ DEP[m] and COPY[m] ⊆ C 如果一个进程依赖于m,但是没有办法来重放m的传输时它就是一个孤儿进程。
Message-Logging Schemes (2/2) Goal: No orphans means that for each message m, DEP[m] ⊆ COPY[m] Pessimistic protocol: for each nonstable message m, there is at most one process dependent on m, that is |DEP[m]| ≤ 1 Consequence: An unstable message in a pessimistic protocol must be made stable before sending a next message Optimistic protocol: for each unstable message m, we ensure that if COPY[m] ⊆ C, then eventually also DEP[m] ⊆ C, where C denotes a set of processes that have been marked as faulty Consequence: To guarantee that DEP[m] ⊆ C, we generally rollback each orphan process Q until Q DEP[m] As pointed out in (Elnozahy et al., 1996), pessimistic logging is so much simpler than optimistic approaches, that it is the preferred way of message logging in practical distributed systems design.