C++ Chapter 19.6 : 멀티 스레딩 예제 (벡터 내적)

Date:     Updated:

Categories:

Tags:

인프런에 있는 홍정모 교수님의 홍정모의 따라 하며 배우는 C++ 강의를 듣고 정리한 필기입니다. 😀
🌜 [홍정모의 따라 하며 배우는 C++]강의 들으러 가기!


chapter 19. 모던 C++ 필수 요소들

멀티 스레딩 예제 (벡터 내적)

🔔 벡터 내적(Dot Product)이란

\[\vec{A}\cdot\vec{B} = x_A{\cdot}x_B + y_A{\cdot}y_B \]

  • 두 벡터의 성분끼리 곱해서 더한 스칼라 값
  • 수학적으로는 A dot B 으로 표현한다.
  • 벡터 내적의 원리와 개념 참고


🔔 벡터 내적을 계산 하는 코드

7 가지 방법으로 벡터 내적을 계산해보고 각각의 방법이 얼마나 효율적인지 실행 시간을 재볼 것이다.

전체 코드와 비교

#include <iostream>
#include <chrono>
#include <mutex>	
#include <utility>
#include <vector>	
#include <atomic>
#include <numeric>		// std::inner_product
#include <random>
#include <execution>	// parallel execution
#include <future>
#include <thread>

using namespace std;

mutex mtx;

void dotProductNaive(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, unsigned long long& sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		sum += (v0[i] * v1[i]);
	}
}

void dotProductLock(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, unsigned long long& sum)
{
	//cout << "Thread Start" << endl;
	for (unsigned int i = start; i < end; ++i)
	{
		std::scoped_lock lock(mtx);
		sum += (v0[i] * v1[i]);
	}
	//cout << "Thread End" << endl;
}

void dotProductAtomic(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, atomic<unsigned long long> & sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		sum += (v0[i] * v1[i]);
	}
}

auto dotProductFuture(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end)
{
	int sum = 0;
	for (unsigned int i = start; i < end; ++i)
		sum += (v0[i] * v1[i]);
	return sum;
}


int main()
{
	const long long numData = 100'000'000;
	const unsigned int numThread = 4;
	// thread 개수 늘린다고 degree of multithreading이 높아지지는 않음

	vector<int> vec0, vec1;
	vec0.reserve(numData);
	vec1.reserve(numData);

	std::random_device seed;
	std::mt19937_64 makerand(seed());
	std::uniform_int_distribution<> range(1, 10);

	for (long long i = 0; i < numData; ++i)
	{
		vec0.push_back(range(makerand));
		vec1.push_back(range(makerand));
	}

	cout << "실험 1번 - std::inner_product\n";
	{
		const auto sta = std::chrono::steady_clock::now();		// 시간 측정 시작
		const auto sum = std::inner_product(vec0.begin(), vec0.end(), vec1.begin(), 0ull);
		// 두 벡터의 개수가 같다고 가정하므로 vec1은 begin만 있어도 되며 ull은 unsigned long long의 약자
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;		

		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}

	cout << "실험 2번 - Naive\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		
		unsigned long long sum = 0;

		vector<std::thread> threads;
		threads.resize(numThread);

		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductNaive, std::ref(vec0), std::ref(vec1),
				t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();

		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;

		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}

	cout << "실험 3번 - Lockguard\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		
		unsigned long long sum = 0;

		vector<std::thread> threads;
		threads.resize(numThread);

		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1),
				t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();

		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;

		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}

	cout << "실험 4번 - Atomic\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		
		atomic<unsigned long long> sum = 0;

		vector<std::thread> threads;
		threads.resize(numThread);

		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductAtomic, std::ref(vec0), std::ref(vec1),
				t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();

		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;

		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}

	cout << "실험 5번 - Future\n";
	{
		const auto sta = std::chrono::steady_clock::now();

		unsigned long long sum = 0;

		vector<std::future<int>> futures;
		futures.resize(numThread);

		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			futures[t] = std::async(dotProductFuture, std::ref(vec0), std::ref(vec1),
				t * numPerThread, (t + 1) * numPerThread);
		for (unsigned int t = 0; t < numThread; ++t)
			sum += futures[t].get();
		
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;

		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
	// TODO : Future의 divide and conquer 방식은 thread에서도 구현해보자.
	// async 대신에 thread와 promise를 사용해서 future을 사용해보자.
	
  cout << "실험 6번 - promise\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		
		vector<std::promise<int>> prom;
		vector<std::future<int>> future;
		vector<std::thread> threads;
		prom.resize(numThread);
		future.resize(numThread);
		threads.resize(numThread);
		
		unsigned long long sum(0);
		unsigned numPerThread = numData / numThread;
		for (unsigned i = 0; i < numThread; ++i)
		{
			future[i] = prom[i].get_future();
			unsigned long long tempSum(0);
			threads[i] = std::thread([&](std::promise<int>&& prom)
				{
					std::scoped_lock lock2(mtx);
					
					for (unsigned int j = 0; j < numPerThread; ++j)
						tempSum += (vec0[j] * vec1[j]);
					prom.set_value(tempSum);
				}, std::move(prom[i]));
			threads[i].join();
		}
		for (unsigned int t = 0; t < numThread; ++t)
			sum += future[t].get();

		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;

		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}

	cout << "실험 7번 - std::transform_reduce\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		const auto sum = std::transform_reduce(std::execution::par, vec0.begin(), vec0.end(), vec1.begin(), 0ull);
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;

		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
}
💎출력💎

실험 1번 - std::inner_product
연산에 소요된 시간 = 0.068767
결과값 = 3025113384

실험 2번 - Naive
연산에 소요된 시간 = 0.04685
결과값 = 889569211

실험 3번 - Lockguard
연산에 소요된 시간 = 2.15617
결과값 = 3025113384

실험 4번 - Atomic
연산에 소요된 시간 = 1.31186
결과값 = 3025113384

실험 5번 - Future
연산에 소요된 시간 = 0.0189238
결과값 = 3025113384

실험 6번 - promise
연산에 소요된 시간 = 0.0189238
결과값 = 3025113384

실험 7번 - std::transform_reduce
연산에 소요된 시간 = 0.014745
결과값 = 3025113384
  • 1️⃣std::inner_product ⭕, 빠름
  • 2️⃣ 순진한 멀티 쓰레딩 ❌, 빠름
    • 레이스 컨디션으로 값이 부정확하게 나온다.
  • 3️⃣ Lock guard ⭕, 느림
    • lock, unlock 과정 때문에 느리다.
  • 4️⃣ atomic ⭕, 느림
    • atomic의 연산은 느린 편이다.
  • 5️⃣ Task-based parallelism (std::async) ⭕, 빠름
  • 6️⃣ Task-based parallelism (std::thread, std::promise) ⭕, 빠름
  • 7️⃣ std::transform_reduce ⭕, 빠름


main 함수

using namespace std;

mutex mtx;

int main()
{
	const long long numData = 100'000'000;
	const unsigned int numThread = 4;

	vector<int> vec0, vec1;
	vec0.reserve(numData);
	vec1.reserve(numData);

	std::random_device seed;
	std::mt19937_64 makerand(seed());
	std::uniform_int_distribution<> range(1, 10);

	for (long long i = 0; i < numData; ++i)
	{
		vec0.push_back(range(makerand));
		vec1.push_back(range(makerand));
	}
  • mtx
    • mutex 객체로 전역 변수로 선언 되었다.
  • numData
    • 벡터 크기
    • 1억개 !
  • numThread
    • 스레드 개수를 4 개로 지정했다.
    • cf) 스레드를 많이 쓴다고 멀티스레딩 효율이 높아지는건 아니다
  • vec0, vec1
    • 두 벡터 컨테이너
    • 각각 reserve 함수를 통해 numData (1억) 크기를 가진 벡터로 만들었다.
  • seed
    • 랜덤넘버 생성 시드
  • std::mt19937_64 makerand(seed());
    • 랜덤 넘버 생성기
  • std::uniform_int_distribution<> range(1, 10);
    • 1~10 범위 중 랜덤 넘버
  • for문
    • 벡터 크기안 1억번 만큼 for문을 돌며 두 벡터에 각각 1~10 중 랜덤한 숫자를 원소로 삽입한다.


방법 1️⃣ std::inner_product

#include <numeric>

  • std::inner_product은 두 벡터의 내적값을 리턴해주는 함수다.
  const auto sum = std::inner_product(vec0.begin(), vec0.end(), vec1.begin(), 0ull);
  • sum에 벡터 내적의 결과를 담음
  • vec1end() 없이 begin() 만 있어도 된다.
    • 벡터 내적을 구할 땐 두 벡터의 성분 개수가 같다고 가정하므로.
  • 0ull
    • 0을 unsigned long long 으로 표기
      • 마치 16진수면 0x라 하는것과 같음
    • sum이 unsigned long long int 타입이 되게끔.
      • 그냥 0이라고 하면 sum은 unsigned long long int 가 아닌 그냥 int가 됨.


방법 2️⃣ 순진한 멀티 쓰레딩

void dotProductNaive(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, unsigned long long& sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		sum += (v0[i] * v1[i]);
	}
}

...

unsigned long long sum = 0;

vector<std::thread> threads;
threads.resize(numThread);

const unsigned int numPerThread = numData / numThread;

for (unsigned int t = 0; t < numThread; ++t)
	threads[t] = std::thread(dotProductNaive, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));

for (unsigned int t = 0; t < numThread; ++t)
	threads[t].join();
  • threads
    • 스레드들을 담는 vector
    • 벡터의 사이즈를 numThread, 즉 4 개 만큼 늘렸다.
  • numPerThread
    • 1 개의 스레드 당 작업할 데이터 개수
    • 모든 스레드의 작업 수가 모두 같다고 가정. (나머지가 0인 경우를 가정)
      • numData 값이 1 억이니 한 스레드 당 2500 만개씩의 데이터를 병렬 처리 하는 셈이다.
  • 첫번째 for문 👉 4 번 돈다.
    • 스레드 마다 벡터 내적을 구하는 작업을 부여
      • 하나의 스레드마다 2500만개(numPerThread)의 데이터의 벡터 내적을 구하고 결과를 sum에 담는다.
        • 첫번째 인수는 함수 doProductNaive. 나머지 인자들은 함수에 필요한 인자들.
          • 시작 범위 t * numPerThread, 끝 범위 (t + 1) * numPerThread
  • 두번째 for문
    • 벡터의 스레드들이 작업이 끝나기를 대기

이 방법은 레이스 컨디션이 발생하기 때문에 벡터 내적의 결과가 부정확하게 나온다.

  • 모든 스레드가 sum 메모리를 공유하기 때문에 발생한다.


방법 3️⃣ Lock guard

void dotProductLock(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, unsigned long long& sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		std::scoped_lock lock(mtx);
		sum += (v0[i] * v1[i]);
	}
}

...

		unsigned long long sum = 0;

		vector<std::thread> threads;
		threads.resize(numThread);

		const unsigned int numPerThread = numData / numThread;

		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));

		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();

sum값을 가지고 연산을 하고 그 결과를 sum에 덮어 씌우는 과정을 한 스레드가 작업 중일땐 다른 스레드가 접근하지 못하도록 lock 하는 방법으로 레이스 컨디션을 해결할 수 있다.

std::scoped_lock lock(mtx);  // scoped_lock 은 unlock 이 필요 없다. C++ 17 부터 가능
sum += (v0[i] * v1[i]);
  • 이제 벡터 내적의 값은 정확하게 나오지만 굉장히 느리다. 무려 2 초를 넘긴다 !
    • lock 을 1 억번 걸어주어야 하고 한 스레드가 작업을 하고 있을 땐 다른 스레드들이 기다려야 하기 때문에..
    • 너무 느려서 멀티 스레딩 하는 보람이 없다.
      • 물론 경우에 따라 다르긴 하지만 이 경우엔 그렇다.
  • scoped_lock을 for문 바깥에다 놓으면 빨라지지만 이건 비동기 작업이 아니라 그냥 동기적으로 순차적으로 실행되는거나 마찬가지다. 병렬 처리가 아님.


방법 4️⃣ atomic


void dotProductAtomic(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, atomic<unsigned long long> & sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		sum += (v0[i] * v1[i]);
	}
}

...

		const unsigned int numPerThread = numData / numThread;

		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));

		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();
  • 벡터 내적 결과를 담을 sumatomic<unsigned long long> 타입으로 받고 있다.
    • 메모리 값을 CPU로 가져오고, CPU에서 연산을 하고, 다시 연산 결과를 메모리에 덮어 씌우는 이 3 단계의 과정을 하나로 묶어준다.
    • 이 3단계의 과정 도중엔 다른 스레드가 접근하지 못하도록!
  • 결과는 정확하나 Lock Guard 처럼 느리다.
    • atomic 연산이 일반 연산보다 느리기 때문에.


방법 5️⃣ Task-based parallelism (std::async)

auto dotProductFuture(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end)
{
	int sum = 0;
	for (unsigned int i = start; i < end; ++i)
		sum += (v0[i] * v1[i]);
	return sum;
}

...

		unsigned long long sum = 0;

		vector<std::future<int>> futures;
		futures.resize(numThread);

		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			futures[t] = std::async(dotProductFuture, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread);

		for (unsigned int t = 0; t < numThread; ++t)
			sum += futures[t].get();
  • futures
    • std::future 객체들이 들어있는 벡터
  • std::async을 통해 doProductFuture 연산 결과를 futures[t] 원소에 받는다.
    • doProductFuture 함수
      • 위 예제들과 다르게 sumdoProductFuture의 인자로 들어가지 않는다.
      • 위 예제의 다른 함수들과 다르게 void가 아닌 리턴 타입이 있는 함수로 설정했다다. 이를 async에서 받아 future객체에 리턴할 수 있도록! 로컬 sum을 리턴한다.
        • 스레드는 연산 결과를 직접 받지 못하기 때문에 void 함수와 더불어 또 조작할 변수도 레퍼런스로 함께 넘겨 주었어야 했음
  • futures[t]에 연산 결과를 받기 전까진 futures[t].get 출력이 실행되지 않는다.
    • sum += futures[t].get();
      • thread는 레퍼런스로 sum을 넘겨서 함수 내부에서 += 했으니 최종적으로 또 취합할 일이 없었다. 대신 join으로 각 스레드가 일이 끝나길 기다리는 과정이 필요했음
        • thread는 다루는 sum이 global 한 변수였기 때문에 이를 레퍼런스로 넘겼다.
        • 그러니 스레드들이 sum을 동시에 건드려 레이스 컨디션이 일어날 우려가 있었음
      • async은 로컬 sum을 리턴하기 때문에 futures.get() 을 통해 리턴 받은 값을 sum += 취합해주어야 한다.
        • 로컬 sum이기에 함수 실행마다 각각 다른 별개의 sum이니까 서로 영향을 줄 일이 없다. 즉 레이스 컨디션이 일어날 일이 없음!
  • 오히려 Naive 하게 멀티 스레딩 했던 2️⃣ 방법보다 연산이 더 빨랐다.


방법 6️⃣ Task-based parallelism (std::thread, std::promise)

		vector<std::promise<int>> prom;
		vector<std::future<int>> future;
		vector<std::thread> threads;
		prom.resize(numThread);
		future.resize(numThread);
		threads.resize(numThread);
		
		unsigned long long sum(0);
		unsigned numPerThread = numData / numThread;

		for (unsigned i = 0; i < numThread; ++i)
		{
			future[i] = prom[i].get_future();  // prom 에서 future 가져와 저장
			unsigned long long tempSum(0);

			threads[i] = std::thread([&](std::promise<int>&& prom)
			{
					std::scoped_lock lock2(mtx);
					
					for (unsigned int j = 0; j < numPerThread; ++j)
						tempSum += (vec0[j] * vec1[j]);

					prom.set_value(tempSum);

			}, std::move(prom[i]));

			threads[i].join();
		}

		for (unsigned int t = 0; t < numThread; ++t)
			sum += future[t].get();

5️⃣와 같다. 다만 asyncthread로 바꾸고 promisefuture에 연산 결과를 받도록 설정한 것 뿐!

  • vector 3개 필요
    vector<std::promise<int>> prom;
    vector<std::future<int>> future;
    vector<std::thread> threads;
    
  • 첫 번째 for 문
    • future 객체에 약속한 promise들을 넘기기로 약속한다.
      • future[i] = prom[i].get_future();
    • 아래와 같은 기능을 하는 람다함수를 스레드에게 부여하고 매개 변수인 promprom[i]을 R-value로서 참조하기 때문에 이 연산 결과를 future로 저장하게 한다.
      std::scoped_lock lock2(mtx);
      					
          for (unsigned int j = 0; j < numPerThread; ++j)
                  tempSum += (vec0[j] * vec1[j]);
                  prom.set_value(tempSum);
      
  • 두 번째 for 문
    • futures[t]에 연산 결과를 받기 전까진 futures[t].get 출력이 실행되지 않는다.


방법 7️⃣ std::transform_reduce

#include <numeric>

const auto sum = std::transform_reduce(std::execution::par, vec0.begin(), vec0.end(), vec1.begin(), 0ull);
  • std::transform_reduce
    • 병렬 처리를 지원해주는 C++ 표준 함수이다.
    • inner_product의 발전형으로 병렬 처리로 벡터 내적을 계산한다.
  • inner_product보다 인수가 하나 더 필요하다.
    • std::execution::par
      • 병렬처리로 돌려달라.
      • 마치 스레드를 사용하는 것 처럼
    • std::execution::seq
      • 병렬처리 말고 순차적으로, 동기적으로 돌려달라.


🔔 참고



🌜 개인 공부 기록용 블로그입니다. 오류나 틀린 부분이 있을 경우 
언제든지 댓글 혹은 메일로 지적해주시면 감사하겠습니다! 😄

맨 위로 이동하기

Cpp 카테고리 내 다른 글 보러가기

Leave a comment