The fastest thread data exchange algorithm, effectively avoid lock competition - TwoQueues

Several points of attention in handling multi thread data sharing:

1, lock the competition: try to reduce the time and number of lock competition.

2, memory: as far as possible the use of allocated memory, reducing the number of memory allocation and release. As far as possible with continuous memory, reduce the share of memory.


Simple scheme for multi thread data exchange A:

Define a list, and then add the lock and unlock the place where all the list is operated.

Simple simulation code:

CSimpleQueue class
{
Public:
CSimpleQueue ()
{
InitializeCriticalSection (&m_crit);
}
~CSimpleQueue ()
{
DeleteCriticalSection (&m_crit);
}

/ / add data
AddData void (pData DATA)
{
EnterCriticalSection (&m_crit);
M_listDATA.push_back (pData);
LeaveCriticalSection (&m_crit);
}
/ / get data
GetData bool (data DATA)
{
EnterCriticalSection (&m_crit);
If (m_listDATA.size () > 0)
{
Data = m_listDATA.front ();
M_listDATA.pop_front ();
LeaveCriticalSection (&m_crit);

True return;
}
Else
{
LeaveCriticalSection (&m_crit);
False return;
}
}

Private:
M_listDATA list<DATA>;
M_crit CRITICAL_SECTION;
};

Multi thread data exchange scheme B - TwoQueues:

Finally, the article has a detailed implementation of TwoQueues source code.

TwoQueues implementation analysis:

Lock competition: TwoQueues use double memory block switch to reduce the lock competition, data writing will be carried out when the lock and unlock operation, but read the data, only when the data read the queue before the end of the data is read, the lock and unlock the two queue exchange. It can be said that the data read is not a plus lock. Thus, the maximum reduction of the lock competition.

Memory: TwoQueues uses a pre allocated memory block to initialize the TwoQueues when it has been allocated to the memory of the data. After the data is stored in the memory block that has been allocated. There is no need to allocate memory again, and no memory allocation is performed.

Further optimization of the TwoQueues, TwoQueues in the storage of data, the data can be completely imitate the length of the data. But this approach will increase the efficiency of data detection. TwoQueues uses a linked list to store data.

Linked list structure:

Struct tagDataHead typedef
{
TagDataHead ()
{
PHead = NULL;
NLen = 0;
PNext = NULL;
}
ReSet void ()
{
PHead = NULL;
NLen = 0;
PNext = NULL;
}
PHead char*;
Int nLen unsigned;
PNext tagDataHead*;
}DATAHEAD;

This chain store the data head pointer and data length. When there is data in the TwoQueues, you can take the data directly through the linked list node. Because each time that you create a linked list is the need for application of memory, and memory application is a more cost efficient to things, TwoQueues again the a little processing, when the chain header does not exist, will realloc calls, one for malloc size chain header structure. And list from the list on the solution and does not release memory but placed in a pFreeDataHead list, when the need to apply chain header structure, first from pFreeDataHead linked list on the take linked list structure. This process reduces the number of memory allocation and release, and improves the efficiency of multi thread data sharing.

The efficiency of TwoQueues has been greatly improved by the optimization process.

Memory shared test before. A thread to write data, read data B.

Debug versionRelease version
TwoQueues 80-100 million times per second 170-180 million times per second
CSimpleQueues 20 times / sec 5 times / sec

Dual queue:

Create object:

M_cTwoQueues CTwoQueues;

/ / initializing double queue size, because it is a double queue, so the memory footprint is two.

M_cTwoQueues.Init (0x0FFFFFFF);

Writing method:

M_cTwoQueues.PushData (SZ, strlen (SZ) +1);

Read mode:

Void* pData const = NULL;
Int nLen unsigned = 0;

If (m_cTwoQueues.PrepareData (pData, nLen))
{
/ / data processing ...

The data confirm / / (drop)
M_cTwoQueues.ConfimData ();
}

Here are all the source code / TwoQueuesSource code download

Once #pragma
<assert.h> #include

ClwCore namespace
{
MALLOC_SIZE #define 128

Struct tagDataHead typedef
{
TagDataHead ()
{
PHead = NULL;
NLen = 0;
PNext = NULL;
}
ReSet void ()
{
PHead = NULL;
NLen = 0;
PNext = NULL;
}
PHead char*;
Int nLen unsigned;
PNext tagDataHead*;
}DATAHEAD;

Struct tagDataMem typedef
{
TagDataMem (int nSize unsigned)
{
If (nSize = 0)
{
Assert (false);
Return;
}

NMaxSize = nSize;
PDataMem = (char*) malloc (nSize*sizeof (char));
NDataPos = 0;

If (NULL = pDataMem)
{
For malloc / / CTwoQueues memory failure.
Assert (false);
}

PListDataHead = NULL;
PCurDataHead = NULL;
PFreeDataHead = NULL;
}
~tagDataMem ()
{
Release / node
ReSet ();
While (NULL = pFreeDataHead)
{
PTempDataHead DATAHEAD* = pFreeDataHead;
PFreeDataHead = pFreeDataHead->pNext;

PTempDataHead delete;
PTempDataHead = NULL;
}

Free (pDataMem);
PDataMem = NULL;
NDataPos = 0;
}
ReAlloc bool ()
{
For (short i=0 unsigned; i<MALLOC_SIZE; ++i)
{
PTempDataHead new = DATAHEAD DATAHEAD*;
(//pTempDataHead->ReSet); / / structure has already been initialized

If (NULL = pTempDataHead)
{
Apply for / / DATAHEAD memory failure.
Assert (false);
False return;
}

PTempDataHead->pNext = pFreeDataHead;
PFreeDataHead = pTempDataHead;
}

True return;
}
GetDataHead DATAHEAD* ()
{
If (NULL = pFreeDataHead)
{
PTempDataHead DATAHEAD* = pFreeDataHead;
PFreeDataHead = pFreeDataHead->pNext;

PTempDataHead return;
}

If (ReAlloc ())
{
If (NULL = pFreeDataHead)
{
PTempDataHead DATAHEAD* = pFreeDataHead;
PFreeDataHead = pFreeDataHead->pNext;

PTempDataHead return;
}
}

("GetDataHead / / ASSERT return NULL. ");
Assert (false);
NULL return;
}
Unsigned int (GetFreeLen) / free memory length
{
NMaxSize-nDataPos return;
}
PushData bool (pData unsigned, int nLen void*)
{
If (nDataPos+nLen = nMaxSize)
{
False return;
}

PTempDataHead DATAHEAD* = GetDataHead ();

If (NULL = pTempDataHead)
{
False return;
}

The data structure / head structure
PTempDataHead->pHead = (pDataMem+nDataPos);
PTempDataHead->nLen = nLen;
PTempDataHead->pNext = NULL;

/ / copy data
Memcpy (pDataMem+nDataPos, pData, nLen);
NDataPos = nLen;

If (NULL = pListDataHead)
{
PListDataHead = pTempDataHead;
PCurDataHead = pTempDataHead;
True return;
}
Else
{
PCurDataHead->pNext = pTempDataHead;
PCurDataHead = pCurDataHead->pNext;
True return;
}
}

Bool (IsEmpty) / judge whether data
{
Return (NULL==pListDataHead) true:false;
}

bool(const void * & prepareData pdata,unsigned int和nlen)/准备一条数据
{
如果(空!= plistdatahead)
{
数据= plistdatahead -> phead;
nlen = plistdatahead -> nlen;
返回真;
}
其他的
{
返回假;
}
}
无效confimdata() /删除一条数据
{
如果(空= = plistdatahead)
{
返回;
}

datahead * ptempdatahead = plistdatahead;
plistdatahead = plistdatahead -> pnext;

ptempdatahead -> reset();
ptempdatahead -> pnext = pfreedatahead;
pfreedatahead = ptempdatahead;
}
无效reset() /重置内存存储对象
{
而(空!= plistdatahead)
{
datahead * ptempdatahead = plistdatahead;
plistdatahead = plistdatahead -> pnext;

ptempdatahead -> reset();
ptempdatahead -> pnext = pfreedatahead;
pfreedatahead = ptempdatahead;
}

ndatapos = 0;
pcurdatahead = null;
}

char * pdatamem;/ /数据内存区域
unsigned int ndatapos;/ /数据存储位置
unsigned int nmaxsize;/ /最大存储区域大小

datahead * plistdatahead;
datahead * pcurdatahead;
datahead * pfreedatahead;/ /空闲头结构队列
} datamem;

类ctwoqueues
{
公众:
ctwoqueues(void)
{
InitializeCriticalSection(与m_crit);
m_pdatamempush = null;
m_pdatamempop = null;
}
~ ctwoqueues(void)
{
如果(空!= m_pdatamempush)
{
删除m_pdatamempush;
m_pdatamempush = null;
}

如果(空!= m_pdatamempop)
{
删除m_pdatamempop;
m_pdatamempop = null;
}
删除临界部分否是是(与m_crit);
}

公众:
初始化程序(unsigned int nsize)
{
如果(0 = = nsize)
{
/ /初始化ctwoqueues对象失败。
断言(假);
返回;
}

m_pdatamempush =新datamem(nsize);
m_pdatamempop =新datamem(nsize);
}

布尔PushData(void * pData,unsigned int nlen)
{
/ / unsigned int nfreelen = m_pdatamempush -> getfreelen();

布尔危险= false;

EnterCriticalSection(与m_crit);
危险= m_pdatamempush -> PushData(pdata,nlen);
保留前一个输入临界段(与m_crit);

返回危险;
}
bool(const void * & prepareData pdata,unsigned int和nlen)
{
布尔bcanread =真;
如果(m_pdatamempop -> isempty())
{
/ /队列没有数据了
EnterCriticalSection(与m_crit);
如果(m_pdatamempush -> isempty())
{
/ /推队列为空
保留前一个输入临界段(与m_crit);
bcanread = false;
}
其他的
{
m_pdatamempop -> reset();/ /充值读取队列
datamem * ptempdatamem = m_pdatamempush;
m_pdatamempush = m_pdatamempop;
m_pdatamempop = ptempdatamem;
保留前一个输入临界段(与m_crit);
bcanread =真;
}
}

如果(bcanread)
{
返回m_pdatamempop -> prepareData(pdata,nlen);
}
其他的
{
返回假;
}
}

无效confimdata()
{
m_pdatamempop -> confimdata();
}

私人的:
datamem * m_pdatamempush;
datamem * m_pdatamempop;
critical_section m_crit;

};
}



有关编译器优化的更完整信息,请参阅优化通知