网站建设 图片他达拉非功效与作用主要会有哪些
最近开始写小册子,一篇篇来,写完了再整理总结到一起。
Libuv是基于事件驱动的异步io库,他本身是一个单进程单线程的。但是难免会有耗时的操作。如果在Libuv的主循环里执行的话,就会阻塞后面的任务执行。所以Libuv里维护了一个线程池。他负责处理Libuv中耗时的操作,比如文件io、dns、用户自定义的耗时任务。文件io因为存在跨平台兼容的问题。无法很好地在事件驱动模块实现异步io。下面分析一下线程池的实现。
我们先看线程池的初始化然后再看他的使用。
static void init_threads(void) {unsigned int i;const char* val;uv_sem_t sem;// 默认线程数4个,static uv_thread_t default_threads[4];nthreads = ARRAY_SIZE(default_threads);// 判断用户是否在环境变量中设置了线程数,是的话取用户定义的val = getenv("UV_THREADPOOL_SIZE");if (val != NULL)nthreads = atoi(val);if (nthreads == 0)nthreads = 1;// #define MAX_THREADPOOL_SIZE 128最多128个线程if (nthreads > MAX_THREADPOOL_SIZE)nthreads = MAX_THREADPOOL_SIZE;threads = default_threads;// 超过默认大小,重新分配内存if (nthreads > ARRAY_SIZE(default_threads)) {threads = uv__malloc(nthreads * sizeof(threads[0]));// 分配内存失败,回退到默认if (threads == NULL) {nthreads = ARRAY_SIZE(default_threads);threads = default_threads;}}// 初始化条件变量if (uv_cond_init(&cond))abort();// 初始化互斥变量if (uv_mutex_init(&mutex))abort();// 初始化三个队列QUEUE_INIT(&wq);QUEUE_INIT(&slow_io_pending_wq);QUEUE_INIT(&run_slow_work_message);// 初始化信号量变量,值为0if (uv_sem_init(&sem, 0))abort();// 创建多个线程,工作函数为worker,sem为worker入参for (i = 0; i < nthreads; i++)if (uv_thread_create(threads + i, worker, &sem))abort();//为0则阻塞,非0则减一,这里等待所有线程启动成功再往下执行for (i = 0; i < nthreads; i++)uv_sem_wait(&sem);uv_sem_destroy(&sem);
}
线程池的初始化主要是初始化一些数据结构,然后创建多个线程。接着在每个线程里执行worker函数。worker是消费者,在分析消费者之前,我们先看一下生产者的逻辑。
static void init_once(void) {init_threads();
}// 给线程池提交一个任务
void uv__work_submit(uv_loop_t* loop,struct uv__work* w,enum uv__work_kind kind,void (*work)(struct uv__work* w),void (*done)(struct uv__work* w, int status)) {// 保证已经初始化线程,并只执行一次,所以线程池是在提交第一个任务的时候才被初始化uv_once(&once, init_once);w->loop = loop;w->work = work;w->done = done;post(&w->wq, kind);
}
这里把业务相关的函数和任务完成后的回调函数封装到uv__work结构体中。uv__work结构定义如下。
struct uv__work {void (*work)(struct uv__work *w);void (*done)(struct uv__work *w, int status);struct uv_loop_s* loop;void* wq[2];
};
然后调post往线程池的队列中加入一个新的任务。Libuv把任务分为三种类型,慢io(dns解析)、快io(文件操作)、cpu密集型等,kind就是说明任务的类型的。我们接着看post函数。
// 把任务插入队列等待线程处理
static void post(QUEUE* q, enum uv__work_kind kind) {// 加锁访问任务队列,因为这个队列是线程池共享的uv_mutex_lock(&mutex);// 类型是慢IOif (kind == UV__WORK_SLOW_IO) {/* 插入慢IO对应的队列,llibuv这个版本把任务分为几种类型,对于慢io类型的任务,libuv是往任务队列里面插入一个特殊的节点run_slow_work_message,然后用slow_io_pending_wq维护了一个慢io任务的队列,当处理到run_slow_work_message这个节点的时候,libuv会从slow_io_pending_wq队列里逐个取出任务节点来执行。 */QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);/*有慢IO任务的时候,需要给主队列wq插入一个消息节点run_slow_work_message,说明有慢IO任务,所以如果run_slow_work_message是空,说明还没有插入主队列。需要进行q = &run_slow_work_message;赋值,然后把run_slow_work_message插入主队列。如果run_slow_work_message非空,说明已经插入线程池的任务队列了。解锁然后直接返回。*/if (!QUEUE_EMPTY(&run_slow_work_message)) {uv_mutex_unlock(&mutex);return;}// 说明run_slow_work_message还没有插入队列,准备插入队列q = &run_slow_work_message;}// 把节点插入主队列,可能是慢IO消息节点或者一般任务QUEUE_INSERT_TAIL(&wq, q);// 有空闲线程则唤醒他,如果大家都在忙,则等到他忙完后就会重新判断是否还有新任务if (idle_threads > 0)uv_cond_signal(&cond);uv_mutex_unlock(&mutex);
}
这就是libuv中线程池的生产者逻辑。架构如下。
除了上面提到的,libuv还提供了另外一种生产者。即uv_queue_work函数。他只针对cpu密集型的。从实现来看,他和第一种生产模式的区别是,通过uv_queue_work提交的任务,是对应一个request的。如果该request对应的任务没有执行完,则事件循环不会退出。而通过uv__work_submit方式提交的任务就算没有执行完,也不会影响事件循环的退出。下面我们看uv_queue_work的实现。
int uv_queue_work(uv_loop_t* loop,uv_work_t* req,uv_work_cb work_cb,uv_after_work_cb after_work_cb) {if (work_cb == NULL)return UV_EINVAL;uv__req_init(loop, req, UV_WORK);req->loop = loop;req->work_cb = work_cb;req->after_work_cb = after_work_cb;uv__work_submit(loop,&req->work_req,UV__WORK_CPU,uv__queue_work,uv__queue_done);return 0;
}
uv_queue_work函数其实也没有太多的逻辑,他保存用户的工作函数和回调到request中。然后提交任务,然后把uv__queue_work和uv__queue_done封装到uv__work中,接着提交任务。所以当这个任务被执行的时候。他会执行工作函数uv__queue_work。
static void uv__queue_work(struct uv__work* w) {// 通过结构体某字段拿到结构体地址uv_work_t* req = container_of(w, uv_work_t, work_req);req->work_cb(req);
}
我们看到uv__queue_work其实就是对用户定义的任务函数进行了封装。这时候我们可以猜到,uv__queue_done也只是对用户回调的简单封装,即他会执行用户的回调。至此,我们分析完了libuv中,线程池的两种生产任务的方式。下面我们开始分析消费者。消费者由worker函数实现。
static void worker(void* arg) {struct uv__work* w;QUEUE* q;int is_slow_work;// 线程启动成功uv_sem_post((uv_sem_t*) arg);arg = NULL;// 加锁互斥访问任务队列uv_mutex_lock(&mutex);for (;;) {/*1 队列为空,2 队列不为空,但是队列里只有慢IO任务且正在执行的慢IO任务个数达到阈值则空闲线程加一,防止慢IO占用过多线程,导致其他快的任务无法得到执行*/while (QUEUE_EMPTY(&wq) ||(QUEUE_HEAD(&wq) == &run_slow_work_message &&QUEUE_NEXT(&run_slow_work_message) == &wq &&slow_io_work_running >= slow_work_thread_threshold())) {idle_threads += 1;// 阻塞,等待唤醒uv_cond_wait(&cond, &mutex);// 被唤醒,开始干活,空闲线程数减一 idle_threads -= 1;}// 取出头结点,头指点可能是退出消息、慢IO,一般请求q = QUEUE_HEAD(&wq);// 如果头结点是退出消息,则结束线程if (q == &exit_message) {// 唤醒其他因为没有任务正阻塞等待任务的线程,告诉他们准备退出uv_cond_signal(&cond);uv_mutex_unlock(&mutex);break;}// 移除节点 QUEUE_REMOVE(q);// 重置前后指针QUEUE_INIT(q); is_slow_work = 0;/* 如果当前节点等于慢IO节点,上面的while只判断了是不是只有慢io任务且达到阈值,这里是任务队列里肯定有非慢io任务,可能有慢io,如果有慢io并且正在 执行的个数达到阈值,则先不处理该慢io任务,继续判断是否还有非慢io任务可执行。*/if (q == &run_slow_work_message) { // 遇到阈值,重新入队 if (slow_io_work_running >= slow_work_thread_threshold()) {QUEUE_INSERT_TAIL(&wq, q);continue;}// 没有慢IO任务则继续if (QUEUE_EMPTY(&slow_io_pending_wq))continue;// 有慢io,开始处理慢IO任务is_slow_work = 1;// 正在处理慢IO任务的个数累加,用于其他线程判断慢IO任务个数是否达到阈值slow_io_work_running++;// 摘下一个慢io任务q = QUEUE_HEAD(&slow_io_pending_wq);QUEUE_REMOVE(q);QUEUE_INIT(q);/*取出一个任务后,如果还有慢IO任务则把慢IO标记节点重新入队,表示还有慢IO任务,因为上面把该标记节点出队了 */if (!QUEUE_EMPTY(&slow_io_pending_wq)) {QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);// 有空闲线程则唤醒他,因为还有任务处理if (idle_threads > 0)uv_cond_signal(&cond);}}// 不需要操作队列了,尽快释放锁uv_mutex_unlock(&mutex);// q是慢IO或者一般任务w = QUEUE_DATA(q, struct uv__work, wq);// 执行业务的任务函数,该函数一般会阻塞w->work(w);// 准备操作loop的任务完成队列,加锁uv_mutex_lock(&w->loop->wq_mutex);// 置空说明指向完了,不能被取消了,见cancel逻辑w->work = NULL; // 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);// 通知loop的wq_async节点uv_async_send(&w->loop->wq_async);uv_mutex_unlock(&w->loop->wq_mutex);// 为下一轮操作任务队列加锁uv_mutex_lock(&mutex);// 执行完慢IO任务,记录正在执行的慢IO个数变量减1,上面加锁保证了互斥访问这个变量if (is_slow_work) {slow_io_work_running--;}}
}
我们看到消费者的逻辑似乎比较复杂,主要是把任务分为三种。并且对于慢io类型的任务,还限制了线程数。其余的逻辑和一般的线程池类型,就是互斥访问任务队列,然后取出节点执行,最后执行回调。不过libuv这里不是直接回调用户的函数。而是通过uv_async_send(&w->loop->wq_async)通知主进程有任务完成了。然后线程继续执行任务。我们看一下这个函数的实现。
// 通知主线程有任务完成
int uv_async_send(uv_async_t* handle) {/*1 pending是0,则设置为1,返回0,2 pending是1则返回1,所以同一个async如果多次调用该函数是会被合并的。只有pending等于0的时候才会执行uv__async_send
*/if (cmpxchgi(&handle->pending, 0, 1) == 0)uv__async_send(handle->loop);return 0;
}static void uv__async_send(uv_loop_t* loop) {const void* buf;ssize_t len;int fd;int r;buf = "";len = 1;// 用于异步通信的管道的写端fd = loop->async_wfd;#if defined(__linux__)// fd等于1说明用的是eventfd而不是管道,管道才有两端if (fd == -1) {static const uint64_t val = 1;buf = &val;len = sizeof(val);// 见uv__async_startfd = loop->async_io_watcher.fd; /* eventfd */}
#endif// 通知读端dor = write(fd, buf, len);while (r == -1 && errno == EINTR);// 省略部分代码
}
uv__async_send通过网eventfd中写入一些数据,触发了对应io观察者的事件。之前在分析async机制的时候讲过。该io观察者的回调是uv__work_done函数。那么我们就看看这个函数的逻辑。
void uv__work_done(uv_async_t* handle) {struct uv__work* w;uv_loop_t* loop;QUEUE* q;QUEUE wq;int err;// 通过结构体字段获得结构体首地址loop = container_of(handle, uv_loop_t, wq_async);// 准备处理队列,加锁uv_mutex_lock(&loop->wq_mutex);// 把loop->wq队列的节点全部移到wp变量中,这样一来可以尽快释放锁QUEUE_MOVE(&loop->wq, &wq);// 不需要使用了,解锁uv_mutex_unlock(&loop->wq_mutex);// wq队列的节点来源是在线程的worker里插入while (!QUEUE_EMPTY(&wq)) {q = QUEUE_HEAD(&wq);QUEUE_REMOVE(q);w = container_of(q, struct uv__work, wq);err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;// 执行回调w->done(w, err);}
}
逐个处理已完成的任务节点,执行回调。这就是整个消费者的逻辑。最后顺带提一下w->work == uv__cancelled。这个处理的用处是为了支持取消一个任务。Libuv提供了uv__work_cancel函数支持用户取消提交的任务。我们看一下他的逻辑。
static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {int cancelled;// 加锁,为了把节点移出队列uv_mutex_lock(&mutex);// 加锁,为了判断w->wq是否为空uv_mutex_lock(&w->loop->wq_mutex);/*w在在任务队列中并且任务函数work不为空,则可取消,在work函数中,如果执行完了任务,会把work置NULL,所以一个任务可以取消的前提是他还没执行完。或者说还没执行过*/cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;// 从任务队列中删除该节点if (cancelled)QUEUE_REMOVE(&w->wq);uv_mutex_unlock(&w->loop->wq_mutex);uv_mutex_unlock(&mutex);// 不能取消if (!cancelled)return UV_EBUSY;// 重置回调函数w->work = uv__cancelled;uv_mutex_lock(&loop->wq_mutex);/*插入loop的wq队列,对于取消的动作,libuv认为是任务执行完了。所以插入已完成的队列,不过他的回调是uv__cancelled函数,而不是用户设置的回调*/QUEUE_INSERT_TAIL(&loop->wq, &w->wq);// 通知主线程有任务完成uv_async_send(&loop->wq_async);uv_mutex_unlock(&loop->wq_mutex);return 0;
}
最后我们举一个使用线程池的例子。这里以文件操作为例子,因为nodejs中文件读写是以线程池实现的。这里直接从uv_fs_open开始(因为js层到c++层主要是一些封装。最后会调到uv_fs_open)。直接看一下uv_fs_open的代码。
// 下面代码是宏展开后的效果
int uv_fs_open(uv_loop_t* loop,uv_fs_t* req,const char* path,int flags,int mode,uv_fs_cb cb
) { // 初始化一些字段UV_REQ_INIT(req, UV_FS); req->fs_type = UV_FS_ ## subtype; req->result = 0; req->ptr = NULL; req->loop = loop; req->path = NULL; req->new_path = NULL; req->bufs = NULL; req->cb = cb; // 同步 if (cb == NULL) { req->path = path; } else { req->path = uv__strdup(path); } req->flags = flags;req->mode = mode; if (cb != NULL) { uv__req_register(loop, req); /* 异步*/ uv__work_submit(loop, &req->work_req, UV__WORK_FAST_IO, uv__fs_work, uv__fs_done ); return 0; } else { /* 同步 */ uv__fs_work(&req->work_req); return req->result; }
我们从上往下看,没有太多的逻辑,函数的最后一个参数cb是nodejs的c++层设置的,c++层会再回调js层。然后open(大部分的文件操作)分为同步和异步两种模式(即fs.open和openSync)。同步直接导致nodejs阻塞,不涉及到线程池,这里只看异步模式。我们看到异步模式下是调用uv__work_submit函数给线程池提交一个任务。设置的工作函数和回调函数分别是uv__fs_work,uv__fs_done。所以我们看一下这两函数。uv__fs_work函数主要是调用操作系统提供的函数。比如open。他会引起线程的阻塞,等到执行完后,他会把返回结果保存到request结构体中。接着执行就是遵从线程池的处理流程。执行回调uv__fs_done。
static void uv__fs_done(struct uv__work* w, int status) {uv_fs_t* req;req = container_of(w, uv_fs_t, work_req);uv__req_unregister(req->loop, req);// 取消了if (status == UV_ECANCELED) {req->result = UV_ECANCELED;}// 执行用户设置的回调,比如nodejsreq->cb(req);
}
没有太多逻辑,直接执行回调,顺便提一下,nodejs里则是执行c++层函数AfterInteger(代码在node_file.cc的Open函数)。
void AfterInteger(uv_fs_t* req) {FSReqWrap* req_wrap = static_cast<FSReqWrap*>(req->data);FSReqAfterScope after(req_wrap, req);if (after.Proceed())req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
}void FSReqWrap::Resolve(Local<Value> value) {Local<Value> argv[2] {Null(env()->isolate()),value};MakeCallback(env()->oncomplete_string(), arraysize(argv), argv);
}
执行resolve,然后执行js层的oncomplete回调,即用户执行open函数时传入的函数。至此,线程池分析完成。