Download presentation
Presentation is loading. Please wait.
1
libdispatch Grand Central Dispatch 异步并发编程模型 From apple
2
块在其目标队列中被调度执行。 基于队列 任务被拆分成不同的块发往不同的队列。 (一组)块执行完成后,发出通知
队列类型:全局并发队列、main队列、私有串行 队列;
3
全局并发队列 q = dispatch_get_global_queue( 执行函数complex_calculation 100遍:
DISPATCH_QUEUE_PRIORITY_DEFAULT, NULL /* reserved for future use */); 执行函数complex_calculation 100遍: dispatch_apply_f(100, q, user_data, complex_calculation); complex_calculation(user_data, i); /* i ∈ [0, 100) */ 多个complex_calculation同时执行
4
main队列 串行执行(back up by one thead) q_main = dispatch_get_main_queue();
全局队列 用来整合苹果Cocoa框架的主线程
5
私有串行队列 串行执行 q_sum = dispatch_queue_create("com.example.sum", NULL);
#define COUNT 128 double sum = 0; void calc_func(void *data, size_t i) { double x = complex_calculation(i); double *sum = (double *)data; dispatch_async(q_sum, ^{ *sum += x}); } dispatch_apply_f(COUNT, q_default, &sum, calc_func);
6
dispatch_set_target_queue
队列之间的关系 dispatch_set_target_queue 线 程 池 全局 并发 队列 私有队列 main队列 mgr队列 low -overcommit default high
7
主要的类和继承关系 dispatch_object_s dispatch_continuation_s dispatch_queue_s
const void *do_vtable; struct x *volatile do_next; dispatch_object_s unsigned int do_ref_cnt; unsigned int do_xref_cnt; unsigned int do_suspend_cnt; struct dispatch_queue_s *do_targetq; void *do_ctxt; dispatch_function_t do_finalizer; dispatch_continuation_s dispatch_queue_s dispatch_source_s dispatch_queue_attr_s dispatch_source_attr_s dispatch_semaphore_s = dispatch_group_s
8
dispatch_queue_s 用队列(FIFO)方式容纳dispatch_object_s
正在运行的DO数目:uint32_t dq_running; 队列并发的宽度:uint32_t dq_width; struct dispatch_object_s * dq_items_head NULL DO volatile dq_items_tail
9
入队操作 _dispatch_queue_push _dispatch_queue_push_list
_dispatch_queue_push_list_slow 队列为空 inline NO inline _dispatch_wakeup
10
出队操作 _dispatch_queue_concurrent_drain_one _dispatch_queue_drain
并发地汲取一个对象返回之 _dispatch_queue_drain 汲取并处理队列中的全部对象 函数调用前先锁定队列: _dispatch_queue_trylock(dq)
11
块的执行流程 块被封装成dispatch_continuation_s对象
_dispatch_queue_push到目标队列,若目标队列原为空则唤醒之 (_dispatch_wakeup) _dispatch_wakeup(dispatch_object_t dou)作如下工作: 若SUSPENDED,则返回NULL 运行vtable->do_probe,其返回False且队列为空,则返回 NULL _dispatch_trylock(对象锁)失败则返回NULL _dispatch_queue_push(dou.do->do_targetq, dou._do); 最终_dispatch_queue_push到根队列(root queue, do_targetq == NULL)
12
派发到线程池 _dispatch_wakeup(root queue) _dispatch_queue_wakeup_global
vtable->do_probe _dispatch_queue_wakeup_global
13
派发到线程池 pthread_workqueue_additem_np ( _dispatch_wakeup(root queue)
vtable->do_probe _dispatch_queue_wakeup_global int pthread_workqueue_additem_np ( pthread_workqueue_t workq, void *( *workitem_func)(void *), void * workitem_arg, pthread_workitem_handle_t * itemhandlep, unsigned int *gencountp)
14
派发到线程池 pthread_workqueue_additem_np ( _dispatch_wakeup(root queue)
vtable->do_probe _dispatch_queue_wakeup_global int pthread_workqueue_additem_np ( pthread_workqueue_t workq, void *( *workitem_func)(void *), void * workitem_arg, pthread_workitem_handle_t * itemhandlep, unsigned int *gencountp) _dispatch_worker_thread2 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) _dispatch_continuation_pop(item);
15
执行 _dispatch_continuation_pop is a "dispatch_continuation_s" ?
处理flag: DISPATCH_OBJ_ASYNC_BIT 处理flag: DISPATCH_OBJ_GROUP_BIT dc->dc_func(dc->dc_ctxt) or is a "dispatch_queue_s"? 调用_dispatch_queue_invoke 检查SUSPEND 并锁 队列锁 _dispatch_queue_drain 解 队列锁 解 对象锁 (在_dispatch_wakeup中锁定)
16
何时wakeup 队列? push到一个空队列时 dq_running为0时
_dispatch_queue_wakeup_global in _dispatch_queue_concurrent_drain_one(派生更 多工作线程)
17
线程池的实现 使用Darwin线程调用 创建线程池:pthread_workqueue_create_np
依据系统当前负荷决定线程池大小(内核支 持) 添加作业:pthread_workqueue_additem_np 内嵌轻量级的实现 dgq_thread_pool_size指定线程池线程的数量 工作函数:_dispatch_worker_thread 工作线程完成作业后,将睡眠在信号量上若干 秒,直至被唤醒或超时结束进程
18
其他实现技巧 引用计数的实现 内部引用计数(do_ref_cnt) 外部引用计数(do_xref_cnt) —— 更好的使用错 误侦测
简单有效的内存分配缓冲 只缓冲dispatch_continuation_t对象 per-thread,单向链表 只增不减,直至工作线程结束才全部释放 fastpath, slowpath
19
Linux的移植 By Mark Heily http://packages.debian.org/squeeze/libdispatch0
附带支撑库: libkqueue (使用epoll, inotify, signalfd, timerfd,实现BSD的kevent接口) libpthread_workqueue (可选,用户态实现了 pthread_workqueue接口)
20
END
Similar presentations