NVIDA的CUDA"__syncthreads()"在传统C++中的等价物是什么。如何专业地同步线程?

What is NVIDA's CUDA '__syncthreads()' equivalent in conventional C++. How to synchronize threads professionally?

本文关键字:是什么 何专业 线程 等价物 同步 C++ syncthreads CUDA NVIDA 传统      更新时间:2023-10-16

我的应用程序中有 4 个线程。一个是主线程,另外 3 个是工作线程。我希望这 3 个工作线程中的前 2 个生成数据,第三个在生成数据时写入数据。数据生成器线程应同步,以便它们并行运行(同时启动"for"循环的每次迭代)。如果 CPU 足够快,编写器线程应该一直在写入。我不知道如何在C++中专业地同步所有这 3 个线程,所以我编写的代码就像有"__syncthreads()"函数来表示我的意思最好的方式。在传统C++中是否有CUDA C'__syncthreads()'的等价物?如果没有,那么如何以我想要的方式最佳地实现同步?(我不喜欢代码中的那些while循环。它们只是不必要地提高了 CPU 利用率)

volatile bool write_flag ;
int main(int argc, char **argv)
{
    ...
    write_flag = false ; // nothing to write at the beginning
    ...
    HANDLE *trdHandles = new HANDLE[WORKING_THREADS] ;
    int IDs[] = {0, 1} ; // IDs for generator threads
    trdHandles[0] = CreateThread(NULL, 0, generator, &IDs[0], 0, NULL) ;  // 1st data generator thread
    if(trdHandles[0] == NULL)
    ExitProcess(0) ;
    trdHandles[1] = CreateThread(NULL, 0, generator, &IDs[1], 0, NULL) ;  // 2nd data generator thread
    if(trdHandles[1] == NULL)
    ExitProcess(0) ;
    trdHandles[2] = CreateThread(NULL, 0, writer, f_out, 0, NULL) ;  // writer thread
    if(trdHandles[2] == NULL)
    ExitProcess(0) ;
    ...
}
WINAPI DWORD generator(LPVOID lpParam)
{
    int *ID = static_cast<int*>(lpParam) ;
    dataGen(*ID) ;
    return 0 ;
}
void dataGen(int id)
{
    ...
    for(int aa = 0; aa < cycles; aa++)
    {
        __syncthreads() ;
        ... // both threads generate data here in parallel
        while(write_flag) // don't generate data too fast. Wait for writes to complete (this flag is initially set to 'false')
        ;
        setBuffers(id, aa) ; // for swapping in/out buffers
        if(id == 0) // only one thread needs to set the flag
        write_flag = true ;
     }
}
WINAPI DWORD writer(LPVOID lpParam)
{
    ofstream *f_out = static_cast<ofstream*>(lpParam) ;
    while(1)
    {
        if(write_flag)
        {
            f_out->write(out_buffer0, chunk_len) ;
            f_out->write(out_buffer1, chunk_len) ;
            write_flag = false ;
            if(!finish)
            continue ;
            else
            return 0 ;
        }
    }
}

查找屏障模式的实现,如信号量小书第 3.5 节中所述。

屏障模式用于同步线程,就像您描述的那样。

C++没有

对多线程的直接支持(直到C++11)。您必须使用操作系统服务来实现多线程和同步。在Windows上有一组丰富的同步功能。对于你的方案,请查看等待函数和事件函数。将SetEventWaitForMultipleObjects结合起来将是一个可行的解决方案。

《信号量小书》这本书还不错,但是它集中在一般的编程方面,而不仅仅是我预期的C++。但是这本书帮助了我,因为我比没有它更快地找到了详细的C++屏障模式解释。读完这篇文章后:

http://adilevin.wordpress.com/2009/06/04/locking-mechanisms/

而这个:

http://adilevin.wordpress.com/category/multithreading/(屏障主要功能部分)

只需要花一点时间来解决我的问题。我通过使用一个bool标志、Semaphore对象和主要WaitForSingleObject()调用的某种组合来解决它,如下面的代码所示。我确定它可以工作,因为在运行时没有断言错误。它是类似于我的应用程序代码的完整代码,但这个代码仅代表我解决问题的方式。如果您对这段代码有任何建议 - 如果它可以以更好的方式实现,那么请回答。

#include <iostream>
#include <conio.h>
#include <stdio.h>
#include <windows.h>
#include <sstream>
#include <cassert>
#define THREADS_NUM 3
WINAPI DWORD generator(LPVOID lpParam) ;
WINAPI DWORD writer(LPVOID lpParam) ;
void dataGen(int id) ;
void locker() ;
void sync_msg_display(std::string msg) ;
volatile bool write_flag = false, finish = false ;
volatile long entered_num ;
int time0 = 950, time1 = 1050, time2 = 550 ;
HANDLE sem0, sem1, sem2 ;
using namespace std ;
int main(void)
{
    HANDLE trdHandles[THREADS_NUM] ;
    int IDs[THREADS_NUM] ;
    for(int aa = 0; aa < THREADS_NUM; aa++)
    IDs[aa] = aa ;
    entered_num = 0 ;
    sem0 = CreateSemaphore(NULL, 0, 4096, NULL) ;
    for(int aa = 0; aa < THREADS_NUM - 1; aa++)
    trdHandles[aa] = CreateThread(NULL, 0, generator, &IDs[aa], 0, NULL) ;
    trdHandles[THREADS_NUM - 1] = CreateThread(NULL, 0, writer, &IDs[THREADS_NUM - 1], 0, NULL) ;
    for(int aa = 0; aa < THREADS_NUM; aa++)
    if(trdHandles[aa] == NULL)
    ExitProcess(0) ;
    WaitForMultipleObjects(THREADS_NUM, trdHandles, true, INFINITE) ;
    for(int aa = 0; aa < THREADS_NUM; aa++)
    CloseHandle(trdHandles[aa]) ;
    CloseHandle(sem0) ;
    CloseHandle(sem1) ;
    CloseHandle(sem2) ;
    cout << "finished !" << endl ; 
    getch() ;
    return 0 ;
}
WINAPI DWORD generator(LPVOID lpParam)
{
int id = *(static_cast<int*>(lpParam)) ;
dataGen(id) ;
return 0 ;
}
WINAPI DWORD writer(LPVOID lpParam)
{
    LONG prev ;
    sem1 = CreateSemaphore(NULL, 0, 4096, NULL) ;
    sem2 = CreateSemaphore(NULL, 0, 4096, NULL) ;
    while(1)
    {
        WaitForSingleObject(sem1, INFINITE) ;
        write_flag = true ;
        sync_msg_display("Write started.") ;
        Sleep(time2) ;
        sync_msg_display("Write finished.") ;
        write_flag = false ;
        ReleaseSemaphore(sem2, 2, &prev) ;
        if(finish)
        return 0 ;
    }
}
void dataGen(int id)
{
    LONG prev ;
    stringstream ss ;
    for(int aa = 0; aa < 20; aa++)
    {
        if(id == 0)
        {
            ss << aa ;
            sync_msg_display("Generator thread no. 0 iteration no. " + ss.str() + " start.") ;
            ss.str("") ;
            if(aa % 2)
            Sleep(time0) ;
            else
            Sleep(time1) ;
            ss << aa ;
            sync_msg_display("Generator thread no. 0 iteration no. " + ss.str() + " complete.") ;
            ss.str("") ;
        }
        else
        {
            ss << aa ;
            sync_msg_display("Generator thread no. 1 iteration no. " + ss.str() + " start.") ;
            ss.str("") ;
            if(aa % 2)
            Sleep(time1) ;
            else
            Sleep(time0) ;
            ss << aa ;
            sync_msg_display("Generator thread no. 1 iteration no. " + ss.str() + " complete.") ;
            ss.str("") ;
        }
        if(write_flag) // don't generate data too fast. Wait for writes to complete (this flag is initially set to 'false')
        WaitForSingleObject(sem2, INFINITE) ;
        assert(!write_flag) ;
        Sleep(10) ; ////
        assert(!write_flag) ;
        locker() ;
        if(id == 0) // only one thread needs to set the flag
        ReleaseSemaphore(sem1, 1, &prev) ;
    }
    locker() ;
    if(id == 0)
    finish = true ;
}
void locker()
{
    LONG prev ;
    if(InterlockedIncrement(&entered_num) < 2)
    WaitForSingleObject(sem0, INFINITE) ;
    else
    {
        entered_num = 0 ;
        ReleaseSemaphore(sem0, 1, &prev) ;
    }
}
void sync_msg_display(string msg)
{
    HANDLE lock = CreateMutex(NULL, FALSE, "mutex") ;
    WaitForSingleObject(lock, INFINITE) ;
    cout << msg << endl ;
    ReleaseMutex(lock) ;
    CloseHandle(lock) ;
}