您现在的位置是:网站首页 > 语言基础 > C++thread类使用总结

简介C++98/03不支持多线程编程,所以必须借助第三方库(如pthreads、boost::thread)或者目标操作系统中的多线程API(如Windows)。自C++11开始包含了一个标准的多线程库,使编写跨平台的多线程应用程序变得更加容易了。

前言

C++98/03不支持多线程编程,所以必须借助第三方库(如pthreads、boost::thread)或者目标操作系统中的多线程API(如Windows)。自C++11开始包含了一个标准的多线程库,使编写跨平台的多线程应用程序变得更加容易了。

使用多线程的理由之一是和进程相比,它是一种非常"节俭"的多任务操作方式。我们知道,在Linux系统下,启动一个新的进程必须分配给它独立的地址空 间,建立众多的数据表来维护它的代码段、堆栈段和数据段,这是一种"昂贵"的多任务工作方式。而运行于一个进程中的多个线程,它们彼此之间使用相同的地址 空间,共享大部分数据,启动一个线程所花费的空间远远小于启动一个进程所花费的空间,而且,线程间彼此切换所需的时间也远远小于进程间切换所需要的时间。

使用多线程的理由之二是线程间方便的通信机制。对不同进程来说,它们具有独立的数据空间,要进行数据的传递只能通过通信的方式进行,这种方式不仅费时,而且很不方便。线程则不然,由于同一进程下的线程之间共享数据空间,所以一个线程的数据可以直接为其它线程所用,这不仅快捷,而且方便。

当然,数据的共享也 带来其他一些问题,有的变量不能同时被两个线程所修改,有的子程序中声明为static的数据更有可能给多线程程序带来灾难性的打击,这些正是编写多线程程序时最需要注意的地方。

除了以上所说的优点外,不和进程比较,多线程程序作为一种多任务、

并发的工作方式,当然有以下的优点:

1) 提高应用程序响应。这对图形界面的程序尤其有意义,当一个操作耗

时很长时,整个系统都会等待这个操作,此时程序不会响应键盘、鼠标、菜单的

操作,而使用多 线程技术,将耗时长的操作(time consuming)置于一个新的

线程,可以避免这种尴尬的情况。

2) 使多CPU系统更加有效。操作系统会保证当线程数不大于CPU数目时,不同的线程运行于不同的CPU上。

3) 改善程序结构。一个既长又复杂的进程可以考虑分为多个线程,成为

几个独立或半独立的运行部分,这样的程序会利于理解和修改。

1.1 创建线程方式

#include <iostream>
#include <functional> // std::bind
#include <thread>
using namespace std;

// 输出取决于系统中处理核心的数量以及操作系统的线程调度
void Counter(int nId, int nIterations)
{
	for (int i = 0; i < nIterations; ++i) {
		cout << "Counter " << nId << " has value " << i << endl;
	}
}

int main(int argc, char* argv[])
{
	thread t1{ Counter, 2, 6 };// 方式1:函数指针
	thread t2{ std::bind(Counter, 3, 6) }; // 方式2: std::bind
	// 方式3: lambda
	thread t3([](int a, double b) { std::cout << "thread3: a:" << a << ",b:" << b << std::endl; }, 1, 2.0); 
	// 确保线程执行完毕
	t1.join();
	t2.join();
	t3.join();
	return 0;
}

注意:C++的流是线程安全的,但来自不同线程的输出仍会交错。可使用互斥体实现同步,参见下面。

1.2 原子操作

在多线程中访问一段数据时,原子也可以解决缓存一致性、内存排序、编译器优化等问题。

1.2.1 原子类型示例

没有添加原子操作,输出结果不确定:

#include <iostream>
#include <thread>
#include <vector>
#include <functional>
#include <chrono>
using namespace std;

//******************************
// 递增100次
//******************************
void func(int& counter)
{
	for (int i = 0; i < 100; ++i) {
		++counter;
		std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 延迟
	}
}
int main(int argc, char* argv[])
{
	int counter = 0;
	std::vector<std::thread> threads;
	for (int i = 0; i < 10; ++i) {
		threads.push_back(std::thread{ func, std::ref(counter) });
	}
	for (auto& t : threads) {
		t.join();
	}
	std::cout << "Result = " << counter << std::endl;
	return 0;
}

添加原子操作后,输出结果确定。 注意:使用原子类型时,应最小化同步次数。 本示例尚可优化。

#include <atomic>
void func(std::atomic<int>& counter)
{
	for (int i = 0; i < 100; ++i) {
		++counter;
		std::this_thread::sleep_for(std::chrono::milliseconds(1));
	}
}
std::atomic<int> counter(0);

优化之后函数为:

void func(std::atomic<int>& counter)
{
	int result = 0;
	for (int i = 0; i < 100; ++i) {
		++result;
		std::this_thread::sleep_for(std::chrono::milliseconds(1));
	}
	counter += result;
}

1.2.2 原子操作示例

std::atomic<int> value(10);
value.fetch_add(4); // 14
value.fetch_sub(1); // 13

1.3 互斥体

1.3.1 <mutex> 头文件介绍

Mutex 系列类(五种)

  • std::mutex,最基本的 Mutex 类。
  • std::recursive_mutex,递归 Mutex 类。
  • std::time_mutex,定时 Mutex 类。
  • std::recursive_timed_mutex,定时递归 Mutex 类。
  • std::shared_timed_mutex,支持共享锁拥有权。

std::mutex和std::recursive_mutex的区别:recursive_mutex允许同一个线程对互斥量多次上锁(即递归上锁)。

std::mutex 的成员函数

  • 构造函数,std::mutex不允许拷贝构造,也不允许move 拷贝,最初产生的 mutex 对象是处于 unlocked 状态的。
  • lock(),调用线程将锁住该互斥量。线程调用该函数会发生下面 3 种情况:(1). 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock之前,该线程一直拥有该锁。(2). 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。(3). 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。
  • unlock(), 解锁,释放对互斥量的所有权。
  • try_lock(),尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞。线程调用该函数也会出现下面 3 种情况,(1). 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。(2). 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。(3). 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。

Lock 类(三种)

  • std::lock_guard,与 Mutex RAII 相关,方便线程对互斥量上锁。
  • std::unique_lock,与 Mutex RAII 相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。
  • std::shared_lock,在底层的共享互斥体上调用与共享拥有权相关的方法。

1.3.2 call_once()示例

结合使用std:: call_once()和std::once_flag可以确保某个函数或方法正好之调用一次,不管有多少个线程试图调用都是如此。

#include <thread>
#include <iostream>
#include <mutex>
#include <vector>
using namespace std;
once_flag gOnceFlag;

void initializeSharedResources()
{
	// ... Initialize shared resources that will be used by multiple threads.
	cout << "Shared resources initialized." << endl;
}
void processingFunction()
{
	// Make sure the shared resources are initialized.
	call_once(gOnceFlag, initializeSharedResources);
	// ... Do some work, including using the shared resources
	cout << "Processing" << endl;
}

int main()
{
	// Launch 3 threads.
	vector<thread> threads(3);
	for (auto& t : threads) {
		t = thread{ processingFunction };
	}
	// Join on all threads
	for (auto& t : threads) {
		t.join();
	}
	return 0;
}

1.3.3 time_mutex示例

#include <iostream>       // std::cout
#include <chrono>         // std::chrono::milliseconds
#include <thread>         // std::thread
#include <mutex>          // std::timed_mutex

std::timed_mutex mtx;

void fireworks() {
	// waiting to get a lock: each thread prints "-" every 200ms:
	while (!mtx.try_lock_for(std::chrono::milliseconds(200))) {
		std::cout << "-";
	}
	// got a lock! - wait for 1s, then this thread prints "*"
	std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	std::cout << "*\n";
	mtx.unlock();
}

int main()
{
	std::thread threads[10];
	// spawn 10 threads:
	for (int i = 0; i<10; ++i)
		threads[i] = std::thread(fireworks);

	for (auto& th : threads) th.join();

	return 0;
}

1.3.4 lock_guard示例

以线程安全方式写入流

#include <thread>
#include <iostream>
#include <mutex>
using namespace std;

class Counter
{
public:
	Counter(int id, int numIterations)
		: mId(id), mNumIterations(numIterations)
	{
	}

	void operator()() const
	{
		for (int i = 0; i < mNumIterations; ++i) {
			lock_guard<mutex> lock(mMutex);
			cout << "Counter " << mId << " has value ";
			cout << i << endl;
		}
	}
private:
	int mId;
	int mNumIterations;
	static mutex mMutex;
};

mutex Counter::mMutex;

int main()
{
	// Using uniform initialization syntax
	thread t1{ Counter{ 1, 20 } };
	// Using named variable
	Counter c(2, 12);
	thread t2(c);
	// Using temporary
	thread t3(Counter(3, 10));
	// Wait for threads to finish
	t1.join();
	t2.join();
	t3.join();
	return 0;
}

1.3.5 unique_lock示例

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::unique_lock

std::mutex mtx;           // mutex for critical section

void print_block(int n, char c) {
	// critical section (exclusive access to std::cout signaled by lifetime of lck):
	std::unique_lock<std::mutex> lck(mtx);
	for (int i = 0; i<n; ++i) {
		std::cout << c;
	}
	std::cout << '\n';
}

int main()
{
	std::thread th1(print_block, 50, '*');
	std::thread th2(print_block, 50, '$');

	th1.join();
	th2.join();

	return 0;
}

1.4 线程异常示例

#include <thread>
#include <iostream>
#include <stdexcept>
using namespace std;

void doSomeWork()
{
	for (int i = 0; i < 5; ++i) {
		cout << i << endl;
	}
	cout << "Thread throwing a runtime_error exception..." << endl;
	throw runtime_error("Exception from thread");
}

void threadFunc(exception_ptr& err)
{
	try {
		doSomeWork();
	}
	catch (...) {
		cout << "Thread caught exception, returning exception..." << endl;
		err = current_exception();
	}
}

void doWorkInThread()
{
	exception_ptr error;
	// Launch background thread
	thread t{ threadFunc, ref(error) };
	// Wait for thread to finish
	t.join();
	// See if thread has thrown any exception
	if (error)
	{
		cout << "Main thread received exception, rethrowing it..." << endl;
		rethrow_exception(error);
	}
	else {
		cout << "Main thread did not receive any exception." << endl;
	}
}

int main()
{
	try {
		doWorkInThread();
	}
	catch (const exception& e) {
		cout << "Main function caught: '" << e.what() << "'" << endl;
	}
	return 0;
}

1.5 条件变量

#include <iostream>              // std::cout
#include <thread>                // std::thread
#include <mutex>                 // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.

void do_print_id(int id)
{
	std::unique_lock <std::mutex> lck(mtx);
	while (!ready)     // 如果标志位不为 true, 则等待...
		cv.wait(lck);  // 当前线程被阻塞, 当全局标志位变为 true 之后,
	// 线程被唤醒, 继续往下执行打印线程编号id.
	std::cout << "thread " << id << '\n';
}

void go()
{
	std::unique_lock <std::mutex> lck(mtx);
	ready = true; // 设置全局标志位为 true.
	cv.notify_all(); // 唤醒所有线程.
}

int main()
{
	std::thread threads[10];
	// spawn 10 threads:
	for (int i = 0; i < 10; ++i)
		threads[i] = std::thread(do_print_id, i);

	std::cout << "10 threads ready to race...\n";
	go(); // go!

	for (auto & th : threads)
		th.join();

	return 0;
}
---------------------------------------------------------------------------------------
#include <iostream>                // std::cout
#include <thread>                  // std::thread, std::this_thread::yield
#include <mutex>                   // std::mutex, std::unique_lock
#include <condition_variable>      // std::condition_variable

std::mutex mtx;
std::condition_variable cv;

int cargo = 0;
bool shipment_available()
{
	return cargo != 0;
}

// 消费者线程.
void consume(int n)
{
	for (int i = 0; i < n; ++i) {
		std::unique_lock <std::mutex> lck(mtx);
		cv.wait(lck, shipment_available);
		std::cout << cargo << '\n';
		cargo = 0;
	}
}

int main()
{
	std::thread consumer_thread(consume, 10); // 消费者线程

    // 主线程为生产者线程, 生产 10 个物品
	for (int i = 0; i < 10; ++i) {
		while (shipment_available())
			std::this_thread::yield(); // 允许其他线程运行
		std::unique_lock <std::mutex> lck(mtx);
		cargo = i + 1;
		cv.notify_one();
	}

	consumer_thread.join();

	return 0;
}
---------------------------------------------------------------------------------------
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <chrono>             // std::chrono::seconds
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status

std::condition_variable cv;

int value;

void do_read_value()
{
	std::cin >> value;
	cv.notify_one();
}

int main()
{
	std::cout << "等待输入一个整数:(计时): \n";
	std::thread th(do_read_value);

	std::mutex mtx;
	std::unique_lock<std::mutex> lck(mtx);
	int nTime = 0;
	while (cv.wait_for(lck, std::chrono::seconds(1)) == std::cv_status::timeout) {
		std::cout << nTime++ << "s" << std::endl;
		std::cout.flush();
	}

	std::cout << "输入: " << value << '\n';

	th.join();
	return 0;
}

1.6 std::future

std::future 可以用来获取异步任务的结果,因此可以把它当成一种简单的线程间同步的手段。

// future example
#include <iostream>             // std::cout
#include <future>               // std::async, std::future
#include <chrono>               // std::chrono::milliseconds

// a non-optimized way of checking for prime numbers:
bool is_prime(int x)
{
	for (int i = 2; i < x; ++i)
		if (x % i == 0)
			return false;
	return true;
}

int main()
{
	// call function asynchronously:
	std::future < bool > fut = std::async(is_prime, 444444443);

	// do something while waiting for function to set future:
	std::cout << "checking, please wait";
	std::chrono::milliseconds span(100);
	while (fut.wait_for(span) == std::future_status::timeout)
		std::cout << '.';

	bool x = fut.get();         // retrieve return value

	std::cout << "\n444444443 " << (x ? "is" : "is not") << " prime.\n";

	return 0;
}
---------------------------------------------------------------------------------------
#include <iostream>       // std::cout
#include <future>         // std::async, std::future, std::shared_future

int do_get_value() { return 10; }

int main()
{
	std::future<int> fut = std::async(do_get_value);
	std::shared_future<int> shared_fut = fut.share();

	// 共享的 future 对象可以被多次访问.
	std::cout << "value: " << shared_fut.get() << '\n';
	std::cout << "its double: " << shared_fut.get() * 2 << '\n';

	return 0;
}
---------------------------------------------------------------------------------------

1.7 线程延迟

void func(int& counter)
{
	std::cout << "延迟开始..." << std::endl;
	// 延迟
	std::this_thread::sleep_for(std::chrono::minutes(1)); 
	std::cout << "延迟结束..." << std::endl;
	// 开始线程操作
	counter++;
}


更多为你推荐