Presentation is loading. Please wait.

Presentation is loading. Please wait.

Distributed Computing

Similar presentations


Presentation on theme: "Distributed Computing"— Presentation transcript:

1 Distributed Computing
Overview

2 Agenda What is distributed computing Why distributed computing Common Architecture Best Practice Case study Condor Hadoop – HDFS and map reduce

3 What is Distributed Computing/System?
A field of computing science that studies distributed system. The use of distributed systems to solve computational problems. Distributed system Wikipedia There are several autonomous computational entities, each of which has its own local memory. The entities communicate with each other by message passing. Operating System Concept The processors communicate with one another through various communication lines, such as high-speed buses or telephone lines. Each processor has its own local memory.

4 What is Distributed Computing/System?
Distributed program A computing program that runs in a distributed system Distributed programming The process of writing distributed program A computer program that runs in a distributed system is called a distributed program, and distributed programming is the process of writing such programs.[1]

5 What is Distributed Computing/System?
Common properties Fault tolerance When one or some nodes fails, the whole system can still work fine except performance. Need to check the status of each node Each node play partial role Each computer has only a limited, incomplete view of the system. Each computer may know only one part of the input. Resource sharing Each user can share the computing power and storage resource in the system with other users Load Sharing Dispatching several tasks to each nodes can help share loading to the whole system. Easy to expand We expect to use few time when adding nodes. Hope to spend no time if possible. Resource sharing: if a number of different sites are connected to one another, then a user at one site A may connect to/use the resource in site B. For example, HDFS in hadoop -> 所有的user可以和其他人共享整個computing power和storage space Load balancing: if a particular computation can be split into several tasks that can run concurrently. Then distributed system can execute these program in several nodes. 如果程式可以同時由好幾個小的task同時執行,那分散式系統可以在好幾個nodes上面執行這樣的程式

6 Why Distributed Computing?
The nature of application Performance Computing intensive The task could consume a lot of time on computing. For example, π Data intensive The task that deals with a lot mount or large size of files. For example, Facebook, LHC(Large Hadron Collider). Robustness No SPOF (Single Point Of Failure) Other nodes can execute the same task executed on failed node. 應用程式本身就需要使用網路和許多部電腦連接 為了performance/價錢的考量, 由一群比較便宜的電腦,和一部高效能的電腦做比較 Facebook: 目前有1PB的資料 LHC: 一年有15PB的資料 3. A distributed system can be more reliable than a non-distributed system, as there is no single point of failure.

7 Common Architectures Communicate and coordinate works among concurrent processes Processes communicate by sending/receiving messages Synchronous/Asynchronous 這邊分類是以資料溝通的觀點來看 一個process含有program counter和address space 2. Message passing是用於在不同process間互相溝通的方式, 其中每個process都有它自己的記憶體空間 3. Process 間的溝通有好幾種, 包括 同步式及非同步式 同步式,sender和receiver會等待資料傳輸完才會繼續處理,好處是這樣程式可以簡化,因為有synchronized point,另外一個好處是就無需buffer,因為sender必須等到receiver收完才會繼續跑,缺點就是會需要等 非同步式,sender和receiver就不會等待資料傳輸完才會繼續處理,好處這樣就可以加快運算的速度 同步可以架構於非同步式之上,做法就是讓sender等待receiver傳送一個acknowledge的訊息 非同步需要考慮的是當buffer滿時的處理,是要將sender做暫停傳送資料,則要小心會有deadlock的情況,如果是將buffer的資料移掉,那訊息就不一樣,就會不可靠 -把資料將一個process的memory區塊移到另一個process的記憶體區塊 There are several types in parallel computing modes: SIMD (single instruction on multiple data) MIMD: different instruction on different data SPMD: Single program on multiple data. Message passing is belonging to MIMD/SPMD because it can use to be process multiple data by 1 program/single instruction. Data is sent by one process and received by another MPI is just a library specification of Message Processing. Non language/compiler spec Non special implementation or a product For everyone – developer, end users, library writers

8 Common Architectures Master/Slave architecture
Master/slave is a model of communication where one device or process has unidirectional control over one or more other devices Database replication Source database can be treated as a master and the destination database can treated as a slave. Client-server web browsers and web servers Unidirectional:單向

9 Common Architectures Data-centric architecture
Using a standard, general-purpose relational database management system  customized in-memory or file-based data structures and access method Using dynamic, table-driven logic in  logic embodied in previously compiled programs Stored procedures  logic running in middle-tier application servers Shared databases as the basis for communicating between parallel processes  direct inter-process communication via message passing function data-centric又稱為Database-centric架構, 指軟體架構中,資料庫所扮演的角色,一個有data-centric的架構可以是下面幾種 的組合 利用標準, general-purpose的資料庫儲存資料,而不是將資料直接存在memory或是直接存在檔案中,由於現在資料庫的演進,很多作業系統都有包含免費的資料庫了,程式開發者就可以使用這些工具,尤其當需要快速開發時,就更可以使用這一類的工具了 使用動態, table-driven logic, 而不是將大量logic寫在程式中,table-driven是指logic是看目前database的內容來決定,這樣作法可以讓程式更簡單,也更有彈性, 使用stored procedure將資料寫進或query DB中而不是倚靠在多層架構中,中間層的application servers的logic運算,可以讓複雜的異動完全由伺服端控制與執行,客戶端只要負責發出呼叫需求與接收執行結果即可,讓應用程式專注在使用者介面與流程控制MVC 使用一個共享database當作不同process溝通的媒介,而不是直接process之間透過message passing的做IPC通信, 一個以database為中心的架構潛在的好處是,透過 database簡化了應用程式的設計,數據庫管理系統,本身可以提供transaction和索引,以實現高度的可靠性,性能和capacity。

10 Best Practice Data Intensive or Computing Intensive
Data size and the amount of data The attribute of data you consume Computing intensive We can move data to the nodes where we can execute jobs Data Intensive We can separate/replicate data to difference nodes, then we can execute our tasks on these nodes Reduce data replication when executing tasks Master nodes need to know data location No data loss when incidents happen SAN (Storage Area Network) Data replication on different nodes Synchronization When splitting tasks to different nodes, how can we make sure these tasks are synchronized? Data locality No data loss -> replcaition Bandwidth: what we have is only limited bandwidth, It’s impossible to copy 1TB data over network. It takes too much time to transfer and process. About transferring, what can we do to improve it? About processing, could we consider any method to give a tag and calculate from that point? Data intensive: facebook, LHC Masternodes need to know data location: hadoop namenode keep data location(資料在那一個datanode)在memory中 Data loss: 一般的grid會用SAN(Storage Area Network), SAN會提供backup的方式,讓資料遺失的可能性減到最低 hadoop會將每個block分散並複製到各個datanode 何謂SAN: SAN技術廣泛的運用在企業裡,用以提供高速的、可管理的、具容錯能力的、富彈性的儲存服務。譬如作為資料儲存、備份、系統備援等。SAN不是單一設備或是某種協定,它是一種服務架構,結合多種硬體(如:光纖、HBA卡、高速交換機、伺服器、磁碟陣列等)與軟體(管理軟體、 initator與target軟體、驅動程式等)的技術。採用SAN的架構,可以將各個單一的儲存設備連結起來,提供整合性的管理與應用。SAN最大的用途不僅在於做為資料的儲存,而是在於其容錯與災難備援的能力。這也是企業採用SAN的主要原因,一方面可以將所有的儲存資源妥善的管理,另一方面可以提供不中斷的營運服務,在遇到天災(水火意外)、人禍(電腦病毒)時,可以在最短的時間內,最有效的復原,從而避免損失。 SAN的優點有: + 儲存設備的分享,具有經濟效益。透過網路架構,所有的用戶端不必直接連接到特定的儲存設備上就可以使用期資源。 + 有效的管理。透過管理軟體,可以更有效的管理儲存的資料與制定備援計劃。 + 容錯能力,降低風險。SAN提供多種容錯功能,從最簡單的mirror到進階的snapshot,在在可以減低資料遺失或是企業服務中斷的風險。 在SAN採取的是Client/Server架構,其中提供儲存能力的一端稱之為Target,而要求資源的一端稱為Initator。Target與 Initator之間,透過高速的網路連結,這通常是光纖。而提供連接的介面我們稱之為HBA(Host Bus Adapter),建構網路的方式則是光纖交換機。這些林林種種的設備,講求的是高速與穩定,但是相對的代表的就是高貴。然而隨著技術的演進,SAN亦有支援IP的介面出現,可以運用現有的Ethernet來達成,譬如SAN/IP與iSCSI技術 Synchronization: when processing data, we may encounter synchronization problem. What about dead lock? 當一些task分別處理資料時,就得考慮是否同步的問題了

11 Best Practice Robustness Still safe when one or partial nodes fail
Need to recover when failed nodes are online. No further or few action is needed Condor – restart daemon Failure detection When any nodes fails, master nodes can detect this situation. Eg: Heartbeat detection App/Users don’t need to know if any partial failure happens. Restart tasks on other nodes for users Heartbeat detection: device/process會定期向master回報狀況,如果時間之內沒有回傳heartbeat,就會開始展開swtichover,讓其他node可以繼續執行

12 Best Practice Network issue Bandwidth Scalability Easy to expand
Need to think of bandwidth when copying files from one node to other nodes if we would like to execute the task on the nodes if no data in these nodes. Scalability Easy to expand Hadoop – configuration modification and start daemon Optimization What can we do if the performance of some nodes is not good? Monitoring the performance of each node According to any information exchange like heartbeat or log Resume the same task on another nodes

13 Best Practice App/User shouldn’t know how to communicate between nodes
User mobility – user can access the system from some point or anywhere Grid – UI (User interface) Condor – submit machine User interface: 是一組application/Client可以存取的API/Command line所組合而成的, 通常都要登入這部機器就可以使用grid環境中的storage space及computing power

14 Case study - Condor Condor Computing intensive jobs Queuing policy
Match task and computing nodes Resource Classification Each resource can advertise its attributes and master can classify according to this

15 Case study - Condor From http://www.cs.wisc.edu/condor/
Central manager: 一個condor pool通常只會有一個 central manager, central manager須要負責收集information(condor_collector)及協調resource和resource request(condor_negotiator). 這兩件事則用兩個不同的daemon去達成, 所以可以用兩部分開的機器來跑這兩個獨立的daemon, 只不過通常都由同一部機器負責執行這兩個daemon, central manager扮演很重要的角色, 只要這部central manager當機, 就沒有daemon負責做協調resource和resource request(又稱之為matchmaking),所以這部機器需要是一部很可靠的機器, 可以長時間執行不會當機的,或者需要用可以快速完成reboot的機器,另外這部負責擔任central manager的機器也需要和所有的node都有良好的 connection, 因為所有的更新資訊都會送到central manager. Execution machine: 所有的機器,包括central manager都是可以成為可以執行job的execution machine的,要成為一個execution machine不需要很好的等級,很好的配備, 唯一要注意的地方是disk空間, 因為一旦有job出現error會有dump一堆訊息時,會先dump到execution machine的local 硬碟, 之後才會將這些dump訊息送回submit machine, 假如disk空間不夠的話,condor回直接限制dump的訊息大小.一般而言,如果機器等級愈好,比如CPU跑更快,RAM更大, SWAP space更大等等的, 就可以滿足更大的resource request, 但是如果user job都沒有特別需求時,那麼所有condor pool的機器都可以提供服務 Submit machine: 所有的機器,包括central manager都是可以成為可以送出job的submit machine的,要成為一個submit machine需要比execution machine的要求要高,首先,因為user都會從這部機器submit許多job到遠端機器來執行,這時除了在遠端會執行job之外,在submit machine也會有另外一個process會負責submit machine和remote machine的溝通,所以當有很多job送出時.這部submit machine就必須要有很大的memory和SWAP space, 除此之外,submit machine也會存放很多 checkpoint的檔案,所以假如您的job需要很多記憶體,也送出很多這樣的job,那麼就須要很多硬碟空間來存放這些checkpoint檔,所以disk空間也是需要很大,checkpoint檔是程式執行狀況的snapshot, 如果程式因為某種原因無法正常執行,那麼就可以利用這些 checkpoint檔在之後重新執行, This program runs on the machine where a given request was submitted and acts as the resource manager for the request. Jobs that are linked for Condor's standard universe, which perform remote system calls, do so via the condor_shadow. Any system call performed on the remote execute machine is sent over the network, back to the condor_shadow which actually performs the system call (such as file I/O) on the submit machine, and the result is sent back over the network to the remote job. In addition, the shadow is responsible for making decisions about the request (such as where checkpoint files should be stored, how certain files should be accessed, etc). Checkpoint server: 同樣地,在condor pool中,其中一部可以被設成checkpoint server,但condor cluster的設定並不是一定要的,用來存放所有checkpoint的檔案,這部機器需要大量的disk space,因為會存放所有jobs的checkpoint檔,也須要和每個節點都有良好的網路連接才能送回checkpoint檔案 From

16 Case study - Condor Role Central Manger Execution machine
The collector of information, and the negotiator between resources and resource requests Execution machine Responsible for executing condor tasks Submit machine Responsible for submitting condor tasks Checkpoint servers Responsible for storing all checkpoint files for the tasks Every machine in a Condor pool can serve a variety of roles. Most machines serve more than one role simultaneously. Certain roles can only be performed by single machines in your pool. The following list describes what these roles are and what resources are required on the machine that is providing that service: Central Manager There can be only one central manager for your pool. The machine is the collector of information, and the negotiator between resources and resource requests. These two halves of the central manager's responsibility are performed by separate daemons, so it would be possible to have different machines providing those two services. However, normally they both live on the same machine. This machine plays a very important part in the Condor pool and should be reliable. If this machine crashes, no further matchmaking can be performed within the Condor system (although all current matches remain in effect until they are broken by either party involved in the match). Therefore, choose for central manager a machine that is likely to be up and running all the time, or at least one that will be rebooted quickly if something goes wrong. The central manager will ideally have a good network connection to all the machines in your pool, since they all send updates over the network to the central manager. All queries go to the central manager. Execute Any machine in your pool (including your Central Manager) can be configured for whether or not it should execute Condor jobs. Obviously, some of your machines will have to serve this function or your pool won't be very useful. Being an execute machine doesn't require many resources at all. About the only resource that might matter is disk space, since if the remote job dumps core, that file is first dumped to the local disk of the execute machine before being sent back to the submit machine for the owner of the job. However, if there isn't much disk space, Condor will simply limit the size of the core file that a remote job will drop. In general the more resources a machine has (swap space, real memory, CPU speed, etc.) the larger the resource requests it can serve. However, if there are requests that don't require many resources, any machine in your pool could serve them. Submit Any machine in your pool (including your Central Manager) can be configured for whether or not it should allow Condor jobs to be submitted. The resource requirements for a submit machine are actually much greater than the resource requirements for an execute machine. First of all, every job that you submit that is currently running on a remote machine generates another process on your submit machine. So, if you have lots of jobs running, you will need a fair amount of swap space and/or real memory. In addition all the checkpoint files from your jobs are stored on the local disk of the machine you submit from. Therefore, if your jobs have a large memory image and you submit a lot of them, you will need a lot of disk space to hold these files. This disk space requirement can be somewhat alleviated with a checkpoint server (described below), however the binaries of the jobs you submit are still stored on the submit machine. Checkpoint Server One machine in your pool can be configured as a checkpoint server. This is optional, and is not part of the standard Condor binary distribution. The checkpoint server is a centralized machine that stores all the checkpoint files for the jobs submitted in your pool. This machine should have lots of disk space and a good network connection to the rest of your pool, as the traffic can be quite heavy. *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- *-*- 3.1.2 The Condor Daemons The following list describes all the daemons and programs that could be started under Condor and what they do: condor_master This daemon is responsible for keeping all the rest of the Condor daemons running on each machine in your pool. It spawns the other daemons, and periodically checks to see if there are new binaries installed for any of them. If there are, the master will restart the affected daemons. In addition, if any daemon crashes, the master will send to the Condor Administrator of your pool and restart the daemon. The condor_master also supports various administrative commands that let you start, stop or reconfigure daemons remotely. The condor_master will run on every machine in your Condor pool, regardless of what functions each machine are performing. condor_startd This daemon represents a given resource (namely, a machine capable of running jobs) to the Condor pool. It advertises certain attributes about that resource that are used to match it with pending resource requests. The startd will run on any machine in your pool that you wish to be able to execute jobs. It is responsible for enforcing the policy that resource owners configure which determines under what conditions remote jobs will be started, suspended, resumed, vacated, or killed. When the startd is ready to execute a Condor job, it spawns the condor_starter, described below. condor_starter This program is the entity that actually spawns the remote Condor job on a given machine. It sets up the execution environment and monitors the job once it is running. When a job completes, the starter notices this, sends back any status information to the submitting machine, and exits. condor_schedd This daemon represents resource requests to the Condor pool. Any machine that you wish to allow users to submit jobs from needs to have a condor_schedd running. When users submit jobs, they go to the schedd, where they are stored in the job queue, which the schedd manages. Various tools to view and manipulate the job queue (such as condor_submit, condor_q, or condor_rm) all must connect to the schedd to do their work. If the schedd is down on a given machine, none of these commands will work. The condor_schedd advertises the number of waiting jobs in its job queue and is responsible for claiming available resources to serve those requests. Once a schedd has been matched with a given resource, the schedd spawns a condor_shadow (described below) to serve that particular request. condor_shadow This program runs on the machine where a given request was submitted and acts as the resource manager for the request. Jobs that are linked for Condor's standard universe, which perform remote system calls, do so via the condor_shadow. Any system call performed on the remote execute machine is sent over the network, back to the condor_shadow which actually performs the system call (such as file I/O) on the submit machine, and the result is sent back over the network to the remote job. In addition, the shadow is responsible for making decisions about the request (such as where checkpoint files should be stored, how certain files should be accessed, etc). condor_collector This daemon is responsible for collecting all the information about the status of a Condor pool. All other daemons periodically send ClassAd updates to the collector. These ClassAds contain all the information about the state of the daemons, the resources they represent or resource requests in the pool (such as jobs that have been submitted to a given schedd). The condor_status command can be used to query the collector for specific information about various parts of Condor. In addition, the Condor daemons themselves query the collector for important information, such as what address to use for sending commands to a remote machine. condor_negotiator This daemon is responsible for all the match-making within the Condor system. Periodically, the negotiator begins a negotiation cycle, where it queries the collector for the current state of all the resources in the pool. It contacts each schedd that has waiting resource requests in priority order, and tries to match available resources with those requests. The negotiator is responsible for enforcing user priorities in the system, where the more resources a given user has claimed, the less priority they have to acquire more resources. If a user with a better priority has jobs that are waiting to run, and resources are claimed by a user with a worse priority, the negotiator can preempt that resource and match it with the user with better priority. NOTE: A higher numerical value of the user priority in Condor translate into worse priority for that user. The best priority you can have is 0.5, the lowest numerical value, and your priority gets worse as this number grows. condor_kbdd This daemon is used on Linux and Windows. On those platforms, the condor_startd frequently cannot determine console (keyboard or mouse) activity directly from the system, and requires a separate process to do so. On Linux, the condor_kbdd connects to the X Server and periodically checks to see if there has been any activity. On Windows, the condor_kbdd runs as the logged-in user and registers with the system to receive keyboard and mouse events. When it detects console activity, the condor_kbdd sends a command to the startd. That way, the startd knows the machine owner is using the machine again and can perform whatever actions are necessary, given the policy it has been configured to enforce. condor_ckpt_server This is the checkpoint server. It services requests to store and retrieve checkpoint files. If your pool is configured to use a checkpoint server but that machine (or the server itself is down) Condor will revert to sending the checkpoint files for a given job back to the submit machine. condor_quill This daemon builds and manages a database that represents a copy of the Condor job queue. The condor_q and condor_history tools can then query the database. condor_dbmsd This daemon assists the condor_quill daemon. condor_gridmanager This daemon handles management and execution of all grid universe jobs. The condor_schedd invokes the condor_gridmanager when there are grid universe jobs in the queue, and the condor_gridmanager exits when there are no more grid universe jobs in the queue. condor_credd This daemon runs on Windows platforms to manage password storage in a secure manner. condor_had This daemon implements the high availability of a pool's central manager through monitoring the communication of necessary daemons. If the current, functioning, central manager machine stops working, then this daemon ensures that another machine takes its place, and becomes the central manager of the pool. condor_replication This daemon assists the condor_had daemon by keeping an updated copy of the pool's state. This state provides a better transition from one machine to the next, in the event that the central manager machine stops working. condor_transferer This short lived daemon is invoked by the condor_replication daemon to accomplish the task of transferring a state file before exiting. condor_procd This daemon controls and monitors process families within Condor. Its use is optional in general but it must be used if privilege separation (see Section ) or group-ID based tracking (see Section ) is enabled. condor_job_router This daemon transforms vanilla universe jobs into grid universe jobs, such that the transformed jobs are capable of running elsewhere, as appropriate. condor_lease_manager This daemon manages leases in a persistent manner. Leases are represented by ClassAds. condor_rooster This daemon wakes hibernating machines based upon configuration details. condor_shared_port This daemon listens for incoming TCP packets on behalf of Condor daemons, thereby reducing the number of required ports that must be opened when Condor is accessible through a firewall. condor_hdfs This daemon manages the configuration of a Hadoop file system as well as the invocation of a properly configured Hadoop file system.

17 Case study - Condor Robustness One execution machine fails Recovery
We can execute the same task on other nodes. Recovery Only need to restart the daemon when the failed nodes are online

18 Case study - Condor Resource sharing
Each condor user can share computing power with other condor users. Synchronization Users need to take care by themselves Users can execute MPI job in a condor pool but need to think of the issues of synchronization and Deadlock. Failure detection Central manager can know when nodes fails Based on update notification sent by nodes Scalability Only execute few commands when new nodes are online.

19 Case study - Hadoop HDFS Namenode: Data Node : Secondary Namenode
manages the file system namespace and regulates access to files by clients. determines the mapping of blocks to DataNodes. Data Node : manage storage attached to the nodes that they run on save CRC codes send heartbeat to namenode. Each data is split as a chunk and each chuck is stored on some data nodes. Secondary Namenode responsible for merging fsImage and EditLog Regulates: 調節

20 Case study - Hadoop 當client需要讀一個檔案時,會用 RPC 方式向namenode要求前面幾個block及其備份資料在那幾個datanode的資訊, 之後client從namenode拿到block資訊在那些datanode之後,就會向”最接近”的節點抓block內部的資料出來,至於何謂最接近,hadoop是用RACK資訊來決定,可以在啟動hadoop時將這個資訊帶入,那麼就可以讓hadoop判斷何謂”最接近”的節點,資料會以stream方式傳回client, 當第一個block取完,就會重複找第2個block所在的datanode, 當前面幾個block資料都取出後,就會向namenode 要下面幾個block的資訊,之後也是找datanode,傳回data,…, 等等的,這邊請注意, client只有在要取block資訊時才會跟namenode溝通,剩下傳輸資料是和datanode直接要資料,這樣好處是當client很多時,namenode也不會因為要serve很多client而成為瓶頸 同一個節點 同一個rack 同一個DC的不同rack 不同DC 當需要write data時,client會向namenode發RPC,在namenode建立一個長度為0的檔案, namenode會執行一連串的確認動作,看看檔案在不在呀,有無權限等等,如果沒問題就會在namenode建立一個record,如果有問題,client就會收到exception, 如果可以寫,那麼client中會將資料分成一個一個packet放到data queue,有一個data streamer會負責尋問namenode找出block要存在那些datanode, data streamer就會將packet從第1個datanode以類似pipeline方式傳送到其他nodes, client這邊也會有一個queue,專門存放那些block已經傳送完畢了,這樣可以避免沒有傳送完的情形

21 Case study - Hadoop Map-reduce Framework JobTracker TaskTracker
Responsible for dispatch job to each tasktracker Job management like removing and scheduling. TaskTracker Responsible for executing job. Usually tasktracker launch another JVM to execute the job.

22 Case study - Hadoop From Hadoop - The Definitive Guide
Runjob()這個API可以很輕鬆建立一個jobClient的instance, 並且使用submitJob()這個API送出job, 一旦使用submitJob之後 , runJob() 這個method會一秒詢問目前job的狀況並且在狀況和最近一次不同時report出來到terminal讓使用者知道. 接下來submibJob()會做下面的動作:要求jobtracker 給予一個job id, 確認job output的輸出是否有問題,比如說權限或是有沒有存在,如果有存在或沒有權限,那麼map-reduce就會失敗, 算算看共有多少個split要被計算, 把要計算所需要的jar, config, copy到HDFS中, 通常jar檔會多複製很多份(預設10份)所以其他tasktracker如果需要跑的時後,可以輕易取到 接下來則是告訴jobtracker說有一個job可以準備執行, 當jobtracker收到這個訊息, 就會將job放到自己的一個queue中,job scheduler會從這個queue將job取出來然後做initialization, 比如說起一個object來代表這個job, bookkeeping相關資訊job status 為了要建立要執行的task清單,Job scheduler會從HDFS抓出要運算的input split清單,一個input split會被分配一個 map task, 至於reduce task的數目則是看JobConf裡面得定義,這個時候,task就會得到一個task ID了 每個task tracker會跑一個loop,把heartbeat資訊不斷送回job tracker, 在heartbeat訊息中,不只說明這個tasktracker是否活的,也說明這個tasktracker是否可以接受新task,如果可以的話,jobtracker會透過heartbeat和tasktracker溝通 Job scheduler預設會有一個priority 排程方式,決定那個job要先被執行.當選定job時,就jobtracker會選定job的每個task 在執行mapper有data locality的考量,就是希望能夠在資料的所在節點或是盡量近的地方執行task, 接下來一個tasktracker接受到一個 task了,會從HDFS copy所須的JAR, config,等等的 放到local硬碟,接下來就會開始建立本地端工作目錄,解開jar檔並建立一個task runner的物件去執行task, task runner會執行一個獨立的 JVM並執行map或reduce的task,所以一旦user application有任何bug都不會影響到task tracker, job tracker的行為 From Hadoop - The Definitive Guide

23 Case study - Hadoop Data replication
Data are replicated to different nodes Reduce the possibility of data loss Data locality. Job will be sent to the node where data are. Robustness One datanode fails We can get data from other nodes. One tasktracker failed We can start the same task on different node Recovery Only need to restart the daemon when the failed nodes are online

24 Case study - Hadoop Resource sharing
Each hadoop user can share computing power and storage space with other hadoop users. Synchronization No synchronization Failure detection Namenode/Jobtracker can know when datanode/tasktracker fails Based on heartbeat

25 Case study - Hadoop Scalability
Only execute few commands when new nodes are online. Optimization A speculative task is launched only when a task takes too much time on one node. The slower task will be killed when the other one has been finished

26 Reference http://en.wikipedia.org/wiki/Message_passing
Tom White - Hadoop - The Definitive Guide Silberschatz Galvin - Operating System Concepts

27 Backup slides

28 Message passing - Synchronous Vs. Asynchronous

29 Case study – Condor (All related daemons)
condor_master: keeping all the rest of the Condor daemons running on each machine condor_startd: represents a given resource and enforcing the policy that resource owners configure which determines under what conditions remote jobs will be started, suspended, resumed, vacated, or killed. When the startd is ready to execute a Condor job condor_starter: spawns the remote Condor job on a given machine condor_schedd: represents resource requests to the Condor pool. condor_shadow condor_collector: collecting all the information about the status of a Condor pool condor_negotiator: execute all the match-making within the Condor system condor_kbdd: notify condor_startd when machine owner condor_ckpt_server: store and retrieve checkpoint files condor_quill: builds and manages a database that represents a copy of the Condor job queue condor_master This daemon is responsible for keeping all the rest of the Condor daemons running on each machine in your pool. It spawns the other daemons, and periodically checks to see if there are new binaries installed for any of them. If there are, the master will restart the affected daemons. In addition, if any daemon crashes, the master will send to the Condor Administrator of your pool and restart the daemon. The condor_master also supports various administrative commands that let you start, stop or reconfigure daemons remotely. The condor_master will run on every machine in your Condor pool, regardless of what functions each machine are performing. condor_startd This daemon represents a given resource (namely, a machine capable of running jobs) to the Condor pool. It advertises certain attributes about that resource that are used to match it with pending resource requests. The startd will run on any machine in your pool that you wish to be able to execute jobs. It is responsible for enforcing the policy that resource owners configure which determines under what conditions remote jobs will be started, suspended, resumed, vacated, or killed. When the startd is ready to execute a Condor job, it spawns the condor_starter, described below. condor_starter This program is the entity that actually spawns the remote Condor job on a given machine. It sets up the execution environment and monitors the job once it is running. When a job completes, the starter notices this, sends back any status information to the submitting machine, and exits. condor_schedd This daemon represents resource requests to the Condor pool. Any machine that you wish to allow users to submit jobs from needs to have a condor_schedd running. When users submit jobs, they go to the schedd, where they are stored in the job queue, which the schedd manages. Various tools to view and manipulate the job queue (such as condor_submit, condor_q, or condor_rm) all must connect to the schedd to do their work. If the schedd is down on a given machine, none of these commands will work. The condor_schedd advertises the number of waiting jobs in its job queue and is responsible for claiming available resources to serve those requests. Once a schedd has been matched with a given resource, the schedd spawns a condor_shadow (described below) to serve that particular request. condor_shadow This program runs on the machine where a given request was submitted and acts as the resource manager for the request. Jobs that are linked for Condor's standard universe, which perform remote system calls, do so via the condor_shadow. Any system call performed on the remote execute machine is sent over the network, back to the condor_shadow which actually performs the system call (such as file I/O) on the submit machine, and the result is sent back over the network to the remote job. In addition, the shadow is responsible for making decisions about the request (such as where checkpoint files should be stored, how certain files should be accessed, etc). condor_collector This daemon is responsible for collecting all the information about the status of a Condor pool. All other daemons periodically send ClassAd updates to the collector. These ClassAds contain all the information about the state of the daemons, the resources they represent or resource requests in the pool (such as jobs that have been submitted to a given schedd). The condor_status command can be used to query the collector for specific information about various parts of Condor. In addition, the Condor daemons themselves query the collector for important information, such as what address to use for sending commands to a remote machine. condor_negotiator This daemon is responsible for all the match-making within the Condor system. Periodically, the negotiator begins a negotiation cycle, where it queries the collector for the current state of all the resources in the pool. It contacts each schedd that has waiting resource requests in priority order, and tries to match available resources with those requests. The negotiator is responsible for enforcing user priorities in the system, where the more resources a given user has claimed, the less priority they have to acquire more resources. If a user with a better priority has jobs that are waiting to run, and resources are claimed by a user with a worse priority, the negotiator can preempt that resource and match it with the user with better priority. NOTE: A higher numerical value of the user priority in Condor translate into worse priority for that user. The best priority you can have is 0.5, the lowest numerical value, and your priority gets worse as this number grows. condor_kbdd This daemon is used on Linux and Windows. On those platforms, the condor_startd frequently cannot determine console (keyboard or mouse) activity directly from the system, and requires a separate process to do so. On Linux, the condor_kbdd connects to the X Server and periodically checks to see if there has been any activity. On Windows, the condor_kbdd runs as the logged-in user and registers with the system to receive keyboard and mouse events. When it detects console activity, the condor_kbdd sends a command to the startd. That way, the startd knows the machine owner is using the machine again and can perform whatever actions are necessary, given the policy it has been configured to enforce. condor_ckpt_server This is the checkpoint server. It services requests to store and retrieve checkpoint files. If your pool is configured to use a checkpoint server but that machine (or the server itself is down) Condor will revert to sending the checkpoint files for a given job back to the submit machine. condor_quill This daemon builds and manages a database that represents a copy of the Condor job queue. The condor_q and condor_history tools can then query the database.

30 Case study – Condor (All related daemons)
condor_had: implementation of high availability of a pool's central manager through monitoring the communication of necessary daemons condor_replication: assists the condor_had daemon by keeping an updated copy of the pool's state condor_transferer: accomplish the task of transferring a file condor_lease_manager: leases in a persistent manner. Leases are represented by ClassAds condor_rooster: wakes hibernating machines based upon configuration details condor_shared_port: listen for incoming TCP packets condor_dbmsd This daemon assists the condor_quill daemon. condor_gridmanager This daemon handles management and execution of all grid universe jobs. The condor_schedd invokes the condor_gridmanager when there are grid universe jobs in the queue, and the condor_gridmanager exits when there are no more grid universe jobs in the queue. condor_credd This daemon runs on Windows platforms to manage password storage in a secure manner. condor_had This daemon implements the high availability of a pool's central manager through monitoring the communication of necessary daemons. If the current, functioning, central manager machine stops working, then this daemon ensures that another machine takes its place, and becomes the central manager of the pool. condor_replication This daemon assists the condor_had daemon by keeping an updated copy of the pool's state. This state provides a better transition from one machine to the next, in the event that the central manager machine stops working. condor_transferer This short lived daemon is invoked by the condor_replication daemon to accomplish the task of transferring a state file before exiting. condor_procd This daemon controls and monitors process families within Condor. Its use is optional in general but it must be used if privilege separation (see Section ) or group-ID based tracking (see Section ) is enabled. condor_job_router This daemon transforms vanilla universe jobs into grid universe jobs, such that the transformed jobs are capable of running elsewhere, as appropriate. condor_lease_manager This daemon manages leases in a persistent manner. Leases are represented by ClassAds. condor_rooster This daemon wakes hibernating machines based upon configuration details. condor_shared_port This daemon listens for incoming TCP packets on behalf of Condor daemons, thereby reducing the number of required ports that must be opened when Condor is accessible through a firewall. condor_hdfs This daemon manages the configuration of a Hadoop file system as well as the invocation of a properly configured Hadoop file system.


Download ppt "Distributed Computing"

Similar presentations


Ads by Google