Dremel: Interactive Analysis of Web-Scale Datasets Sergey Melnik, Andrey Gubarev, Jing Jing Long, Geoffrey Romer, Shiva Shivakumar, Matt Tolton, Theo Vassilakis (Google) VLDB 2010
New age in Google Before big data times After that… GFS/BigTable/Megastore MapReduce After that… Ecosystem based on MapReduce Pregel/Caffeine/Dremel Caffeine is google’s new index engine.
Speed matters Trends Interactive Tools Spam Detection Web Dashboards 3s分析1PB数据 Web Dashboards Network Optimization
Example: data exploration Runs a MapReduce to extract billions of signals from web pages 1 Googler Alice 2 Ad hoc SQL against Dremel DEFINE TABLE t AS /path/to/data/* SELECT TOP(signal, 100), COUNT(*) FROM t . . . 3 More MR-based processing on her data (FlumeJava [PLDI'10], Sawzall [Sci.Pr.'05])
Dremel system Trillion-record, multi-terabyte datasets at interactive speed Scales to thousands of nodes Fault and straggler tolerant execution Nested data model Complex datasets; normalization is prohibitive Columnar storage and processing Tree architecture (as in web search) Interoperates with Google's data mgmt tools In situ data access (e.g., GFS, Bigtable) MapReduce pipelines Dremel是一个大规模系统。在一个PB级别的数据集上面,将任务缩短到秒级,无疑需要大量的并发。磁盘的顺序读速度在100MB/S上下,那么在1S内处理1TB数据,意味着至少需要有1万个磁盘的并发读! Google一向是用廉价机器办大事的好手。但是机器越多,出问题概率越大,如此大的集群规模,需要有足够的容错考虑,保证整个分析的速度不被集群中的个别慢(坏)节点影响。 Dremel是MR交互式查询能力不足的补充。和MapReduce一样,Dremel也需要和数据运行在一起,将计算移动到数据上面。所以它需要GFS这样的文件系统作为存储层。在设计之初,Dremel并非是MapReduce的替代品,它只是可以执行非常快的分析,在使用的时候,常常用它来处理MapReduce的结果集或者用来建立分析原型。 Dremel的数据模型是嵌套(nested)的。互联网数据常常是非关系型的。Dremel还需要有一个灵活的数据模型,这个数据模型至关重要。Dremel支持一个嵌套(nested)的数据模型,类似于Json。而传统的关系模型,由于不可避免的有大量的Join操作,在处理如此大规模的数据的时候,往往是有心无力的。 Dremel中的数据是用列式存储的。使用列式存储,分析的时候,可以只扫描需要的那部分数据的时候,减少CPU和磁盘的访问量。同时列式存储是压缩友好的,使用压缩,可以综合CPU和磁盘,发挥最大的效能。对于关系型数据,如果使用列式存储,我们都很有经验。但是对于嵌套(nested)的结构,Dremel也可以用列存储,非常值得我们学习。 Dremel结合了Web搜索 和并行DBMS的技术。首先,他借鉴了Web搜索中的“查询树”的概念,将一个相对巨大复杂的查询,分割成较小较简单的查询。大事化小,小事化了,能并发的在大量节点上跑。其次,和并行DBMS类似,Dremel可以提供了一个SQL-like的接口,就像Hive和Pig那样。
Why call it Dremel Brand of power tools that primarily rely on their speed as opposed to torque Data analysis tool that uses speed instead of raw power
Widely used inside Google Analysis of crawled web documents Tracking install data for applications on Android Market Crash reporting for Google products OCR results from Google Books Spam analysis Debugging of map tiles on Google Maps Tablet migrations in managed Bigtable instances Results of tests run on Google's distributed build system Disk I/O statistics for hundreds of thousands of disks Resource monitoring for jobs run in Google's data centers Symbols and dependencies in Google's codebase
Outline Nested columnar storage Query processing Experiments Observations
Records vs. columns r1 r1 r1 r1 r2 r2 r2 r2 r1 A . . . B E C D . . . * DocId: 10 Links Forward: 20 Name Language Code: 'en-us' Country: 'us' Url: 'http://A' Url: 'http://B' A * * . . . B E * C D r1 r1 r1 r1 r2 r2 r2 Read less, cheaper decompression r2 . . . Challenge: preserve structure, reconstruct from a subset of fields
Nested data model r1 r2 http://code.google.com/apis/protocolbuffers DocId: 10 Links Forward: 20 Forward: 40 Forward: 60 Name Language Code: 'en-us' Country: 'us' Code: 'en' Url: 'http://A' Url: 'http://B' Code: 'en-gb' Country: 'gb' r1 multiplicity: message Document { required int64 DocId; [1,1] optional group Links { repeated int64 Backward; [0,*] repeated int64 Forward; } repeated group Name { repeated group Language { required string Code; optional string Country; [0,1] } optional string Url; } } r2 DocId: 20 Links Backward: 10 Backward: 30 Forward: 80 Name Url: 'http://C'
Column-striped representation DocId Name.Url Links.Forward Links.Backward value r d 10 20 value r d http://A 2 http://B 1 NULL http://C value r d 20 2 40 1 60 80 value r d NULL 1 10 2 30 Name.Language.Code Name.Language.Country value r d en-us 2 en NULL 1 en-gb value r d us 3 NULL 2 1 gb
Repetition and definition levels DocId: 10 Links Forward: 20 Forward: 40 Forward: 60 Name Language Code: 'en-us' Country: 'us' Code: 'en' Url: 'http://A' Url: 'http://B' Code: 'en-gb' Country: 'gb' r=1 r=2 (non-repeating) Name.Language.Code record (r=0) has repeated value r d en-us 2 en NULL 1 en-gb Language (r=2) has repeated r2 r: At what repeated field in the field's path the value has repeated DocId: 20 Links Backward: 10 Backward: 30 Forward: 80 Name Url: 'http://C' d: How many fields in paths that could be undefined (opt. or rep.) are actually present
Record assembly FSM Transitions labeled with repetition levels DocId 1 1 Links.Backward Links.Forward 0,1,2 Name.Language.Code Name.Language.Country 2 0,1 1 Name.Url For record-oriented data processing (e.g., MapReduce)
Reading two fields s1 s2 Structure of parent fields is preserved. DocId: 10 Name Language Country: 'us' Name Name Country: 'gb' DocId 1,2 Name.Language.Country s2 DocId: 20 Name Structure of parent fields is preserved. Useful for queries like /Name[3]/Language[1]/Country
Outline Nested columnar storage Query processing Experiments Observations
Query processing Optimized for select-project-aggregate Very common class of interactive queries Single scan Within-record and cross-record aggregation Approximations: count(distinct), top-k Joins, temp tables, UDFs/TVFs, etc.
SQL dialect for nested data SELECT DocId AS Id, COUNT(Name.Language.Code) WITHIN Name AS Cnt, Name.Url + ',' + Name.Language.Code AS Str FROM t WHERE REGEXP(Name.Url, '^http') AND DocId < 20; Output table Output schema t1 Id: 10 Name Cnt: 2 Language Str: 'http://A,en-us' Str: 'http://A,en' Cnt: 0 message QueryResult { required int64 Id; repeated group Name { optional uint64 Cnt; repeated group Language { optional string Str; } } }
Serving tree Parallelizes scheduling and aggregation Fault tolerance [Dean WSDM'09] client Parallelizes scheduling and aggregation Fault tolerance Stragglers Designed for "small" results (<1M records) root server intermediate servers . . . . . . leaf servers (with local storage) . . . histogram of response times storage layer (e.g., GFS)
Example: count() SELECT A, COUNT(B) FROM T GROUP BY A T = {/gfs/1, /gfs/2, …, /gfs/100000} SELECT A, SUM(c) FROM (R11 UNION ALL R110) GROUP BY A R11 R12 SELECT A, COUNT(B) AS c FROM T11 GROUP BY A T11 = {/gfs/1, …, /gfs/10000} SELECT A, COUNT(B) AS c FROM T12 GROUP BY A T12 = {/gfs/10001, …, /gfs/20000} 1 . . . . . . SELECT A, COUNT(B) AS c FROM T31 GROUP BY A T31 = {/gfs/1} . . . 3 Data access ops
Outline Nested columnar storage Query processing Experiments Observations
Experiments 1 PB of real data (uncompressed, non-replicated) 100K-800K tablets per table Experiments run during business hours Table name Number of records Size (unrepl., compressed) Number of fields Data center Repl. factor T1 85 billion 87 TB 270 A 3× T2 24 billion 13 TB 530 T3 4 billion 70 TB 1200 T4 1+ trillion 105 TB 50 B T5 20 TB 30 2× 首先,我们测试看看列存的效果。对于T1表,1GB的数据大约有300K行,使用列存的话压缩后大约在375MB。这台机器磁盘的吞吐在70MB/s左右。这1GB的数据,就是我们的现在的测试数据源,测试环境是单机。
Read from disk "cold" time on local disk, averaged over 30 runs time (sec) (e) parse as C++ objects 10x speedup using columnar storage from records objects (d) read + decompress records columns (c) parse as C++ objects 曲线A,是用列存读取数据并解压的耗时。 曲线B是一条一条记录挨个读的时间。 曲线C是在B的基础上,加上了反序列化的时间。 曲线d,是按行存读并解压的耗时。 曲线e加上了反序列化的时间。因为列很多,反序列化耗时超过了读并解压的50%。 from columns (b) assemble records 2-4x overhead of using records (a) read + decompress number of fields Table partition: 375 MB (compressed), 300K rows, 125 columns
MR and Dremel execution Avg # of terms in txtField in 85 billion record table T1 execution time (sec) on 3000 nodes Sawzall program ran on MR: num_recs: table sum of int; num_words: table sum of int; emit num_recs <- 1; emit num_words <- count_words(input.txtField); 87 TB 0.5 TB 0.5 TB Q1: SELECT SUM(count_words(txtField)) / COUNT(*) FROM T1 MR overheads: launch jobs, schedule 0.5M tasks, assemble records
Impact of serving tree depth execution time (sec) (returns 100s of records) (returns 1M records) 上图是这两个Query在不同的server拓扑下的性能。每个测试都是有2900个叶子Server。在2级拓扑中,根server直接和叶子Server通信。在3级拓扑中,各个级别的比例是1:100:2900,增加了100个中间Server。在4级拓扑中,比例为1:10:100:2900. Q2可以在3级拓扑下3秒内执行完毕,但是为他提供更高的拓扑级别,对性能提升没有裨益。相比之下,为Q3提供更高的拓扑级别,性能可以有效提升。这个测试体现了树状拓扑对性能提升的作用。 SELECT country, SUM(item.amount) FROM T2 GROUP BY country Q2: SELECT domain, SUM(item.amount) FROM T2 WHERE domain CONTAINS ’.net’ GROUP BY domain Q3: 40 billion nested items
Scalability execution time (sec) number of leaf servers Q5 on a trillion-row table T4: SELECT TOP(aid, 20), COUNT(*) FROM T4
Outline Nested columnar storage Query processing Experiments Observations
Most queries complete under 10 sec Interactive speed Monthly query workload of one 3000-node Dremel instance percentage of queries execution time (sec) Most queries complete under 10 sec
Most queries complete under 10 sec Interactive speed Monthly query workload of one 3000-node Dremel instance 值得注意的是T5的数据只有两份拷贝,所以有更高的概率出现坏节点和拖油瓶。这个查询需要扫描大约1TB的压缩数据,使用2500个节点。 可以看到99%的分区都在5S内完成的。不幸的是,有一些分区需要较长的时间来处理。尽管通过动态调度可以加快一些,但在如此大规模的计算上面,很难完全不出问题。如果不在意太精确的结果,完全可以小小减少覆盖的比例,大大提升相应速度。 Most queries complete under 10 sec
Observations Possible to analyze large disk-resident datasets interactively on commodity hardware 1T records, 1000s of nodes MR can benefit from columnar storage just like a parallel DBMS But record assembly is expensive Interactive SQL and MR can be complementary Parallel DBMSes may benefit from serving tree architecture just like search engines
BigQuery: powered by Dremel http://code.google.com/apis/bigquery/ Your Data Upload your data to Google Storage 1. Upload BigQuery 2. Process Import to tables Your Apps Run queries 3. Act