之前写过一篇文章叫《写了一段高端C++代码》,这篇文章的背景和它完全相同,我这里再复述一遍。
背景:
在音视频方向中,线程分为普通线程和GL线程(OpenGL线程),GL线程中可以执行OpenGL相关的语句,做一些图像渲染的工作,也可以理解为所有GL语句都要在GL线程中执行;而在普通线程中,只能执行那些我们平时经常接触的普通语句。
在具体项目开发中会有些需求:在普通线程中突然想要执行某些必须要在GL线程下执行的任务(比如某些初始化工作,释放某些GL相关的对象),执行完此任务后又继续执行自己的任务,像在同一个线程执行一样:
void func() {
task1();
task2(); // 需要在GL线程执行
task3();
}
分析:
这里有个关键点:task3()一定要等到task2()执行完毕后才可执行,但是由于task2()是被抛到了其他线程运行,没有起到阻塞执行的效果。
怎么能达到目的呢?可以这样使用条件变量:
void task2() {
...
notify();
}
void func() {
task1();
task2(); // 需要在GL线程执行
wait();
task3();
}
普通线程在task2()后使用wait()阻塞线程,待GL线程中的任务执行完后使用notity()打断普通线程的阻塞,可达到顺序执行的目的。
但这样非常麻烦,而且不通用,代码还相当难看。
在之前的文章里我使用C++的future封装了一套函数,可以方便的跨线程阻塞调度某个任务执行,然而我还有个项目是使用纯C语言开发的,没有了C++的future,要完成类似需求就比较困难,但是,困难也得搞阿,于是有了下面的代码。
先看下如何函数的设计:
typedef struct Dispatcher Dispatcher;
typedef void (*DispatcherFunc)(void* arg);
/**
* @brief 创建一个实例,内部会常驻一个GL线程,外部可以将某些任务丢到此线程里执行,可以选择是否阻塞执行
*/
Dispatcher* Dispatcher_create();
/**
* @brief 销毁实例
*/
void Dispatcher_destroy(Dispatcher** dispatcher_p);
/**
* @brief 利用此函数将任务丢到线程里执行
*
* @param dispatcher 实例
* @param func 要执行的函数
* @param arg 函数参数
* @param block 选择是否阻塞执行
*/
void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block);
主要功能就在最后一个函数,该函数可以选择是否阻塞执行,这里大家可以思考一下,如何实现阻塞的需求?条件变量是肯定的,但是如何保证条件变量等待的是当前事件呢?还是直接看代码实现吧:
#include <stdatomic.h>
#include "libctools.h"
#include "gldispatch.h"
typedef struct Task {
DispatcherFunc func;
void* arg;
Mutex* mutex;
Cond* cond;
bool is_continue;
bool is_block;
} Task;
struct Dispatcher {
Thread* thread_id;
Thread _thread_id;
Mutex* mutex;
Cond* cond;
List* task_queue;
atomic_bool is_interrupt;
};
static void taskDestroy(Task* task) {
if (task->mutex) {
mutex_destroyp(&task->mutex);
}
if (task->cond) {
cond_destroyp(&task->cond);
}
free((void*)task);
}
static Task* taskCreate(DispatcherFunc func, void* arg, bool is_block) {
Task* task = (Task*)calloc(1, sizeof(Task));
if (!task) return NULL;
task->func = func;
task->arg = arg;
task->is_block = is_block;
if (is_block) {
task->mutex = mutex_create();
task->cond = cond_create();
}
return task;
}
static int Dispatcher_process(void* arg) {
log_info("%s start", __func__);
Dispatcher* self = (Dispatcher*)arg;
while (!atomic_load(&self->is_interrupt)) {
mutex_lock(self->mutex);
while (self->task_queue->len == 0 && !atomic_load(&self->is_interrupt)) {
cond_wait(self->cond, self->mutex);
}
ListNode* node = list_pop_front(self->task_queue);
mutex_unlock(self->mutex);
if (atomic_load(&self->is_interrupt)) {
break;
}
if (node) {
Task* task = (Task*)node->val;
task->func(task->arg);
if (task->is_block) {
mutex_lock(task->mutex);
task->is_continue = true;
cond_signal(task->cond);
mutex_unlock(task->mutex);
} else {
taskDestroy(task);
}
free(node);
}
}
log_info("%s stop", __func__);
return 0;
}
static void Dispatcher_start(Dispatcher* dispatcher) {
dispatcher->thread_id =
thread_create_with_name(&dispatcher->_thread_id, Dispatcher_process, dispatcher, "Dispatcher_process");
}
static void Dispatcher_stop(Dispatcher* dispatcher) {
atomic_store(&dispatcher->is_interrupt, true);
cond_signal(dispatcher->cond);
list_clear(dispatcher->task_queue, true);
if (dispatcher->thread_id) {
thread_wait(dispatcher->thread_id, NULL);
}
}
Dispatcher* Dispatcher_create() {
Dispatcher* self = (Dispatcher*)calloc(1, sizeof(Dispatcher));
if (!self) return NULL;
self->mutex = mutex_create();
self->cond = cond_create();
self->task_queue = list_create();
self->task_queue->free_func = (void (*)(int64_t))taskDestroy;
Dispatcher_start(self);
return self;
}
void Dispatcher_destroy(Dispatcher** dispatcher_p) {
if (NULL == dispatcher_p || NULL == *dispatcher_p) return;
Dispatcher* self = *dispatcher_p;
Dispatcher_stop(self);
mutex_destroyp(&self->mutex);
cond_destroyp(&self->cond);
list_destroy(self->task_queue);
freep((void**)dispatcher_p);
}
void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block) {
if (!dispatcher) return;
Task* task = taskCreate(func, arg, block);
if (task) {
ListNode* node = list_node_new((int64_t)task);
mutex_lock(dispatcher->mutex);
list_push_back(dispatcher->task_queue, node);
cond_signal(dispatcher->cond);
mutex_unlock(dispatcher->mutex);
if (task->is_block) {
mutex_lock(task->mutex);
while (!task->is_continue) {
cond_wait(task->cond, task->mutex);
}
mutex_unlock(task->mutex);
taskDestroy(task);
}
}
}
使用方式:
void func1(void* arg) {
print("hello func1");
}
void func2(void* arg) {
print("hello func2");
}
int main() {
Dispatcher* dispatcher = Dispatcher_create();
Dispatcher_run(dispatcher, func1, NULL, true);
Dispatcher_run(dispatcher, func2, NULL, true);
Dispatcher_destroy(&dispatcher);
return 0;
}
tips:
代码中的log、mutex、cond、thread、list都是二次封装的函数,功能无非就是log、加解锁、条件变量、创建线程以及C语言的链表。这里就不贴出他们的实现了,大家可以自己实现一套,当作个小练习,不难。
代码我也就不过多介绍了,相信有点水平的朋友都能看懂,有问题可以留言!