中国开发网: 论坛: 程序员情感CBD: 贴子 91807
haitao
技术贴.C++.队列.多线程安全:有没有现成的安全可靠的、简单易用的队列、并发互斥锁机制
我使用C++标准的list,好像不使用信号量之类的锁机制也不安全

那么,有没有安全可靠的、简单易用的并发互斥锁机制

最好win32/linux通用


感觉这些东西是写多线程服务程序必备的,搞这方面的人应该肯定有现成的解决“方案”吧
即便不好公开代码,指点一下思路也是功德无量啊

贴一个blog上的代码:高人评析一下
http://blog.csdn.net/wwwooowww/archive/2004/12/10/211544.aspx


/*
BytesQueue.h
zhangggdlt
2004/11/15

to realize a queue storing bytes array.
*/
#ifndef _BYTES_QUEUE_H
#define _BYTES_QUEUE_H

#include <pthread.h>
#include <unistd.h>

#define OPERATION_OK 0
#define QUEUE_FULL -1
#define QUEUE_EMPTY -2
#define INCREASE_FAILED -3
#define NO_AREA -4
#define POINT_NULL -5
#define FAILED_LOCK -6


typedef int ERR_NUMBER;
typedef unsigned char uint_8;

/*
The class BytesQueue is used to realize store an unsigned char array into the queue which
sustain mutiple thread and sycronization.
This queue is a cycle queue. The size of the queue can be set when it is constructed and you
can also increas the size of the queue during the application.
*/
class BytesQueue
{
private:
int _size;
int _head;
int _rear;
uint_8 **_buffer;
pthread_mutex_t QueMutex;

public:
BytesQueue(int size=512);
ERR_NUMBER increaseSize(int size=512);
ERR_NUMBER inQueue(const uint_8 *data, int len);
ERR_NUMBER outQueue(uint_8 *data, int &len);

void destroy();
void errMessage(ERR_NUMBER err);
void showBytesQueue(BytesQueue& bq);
};



#endif //_BYTES_QUEUE_H

——————————————————————————————————————
/*
BytesQueue.cpp
zhangggdlt
2004/12/9

to realize a stack storing bytes array which sustain the mutitread and sycronization.
*/
#include <stdio.h>
#include <string.h>
#include "BytesQueue.h"


/*
Constructor.
This BytesQueue can sustain sycronization among the mutiThread.
It means you can use this data structure under mutithread.
*/
BytesQueue::BytesQueue(int size) //size = 512
{
this->_size = size;
this->_buffer = new (uint_8*)[this->_size];
this->_head = 0;
this->_rear = 0;
pthread_mutex_init(&QueMutex, NULL);
}

/*
You can use this number fuction to increase the size of the queue.
The data will not be lost during the increasement.
*/
ERR_NUMBER BytesQueue::increaseSize(int size) //size = 512
{
uint_8 **temp;
int eleCount = (this->_rear - this->_head + 1 + this->_size) % this->_size;
int tempSize = this->_size;
int i,j;
this->_size += size;
if(!(temp = new (uint_8*)[this->_size]))
return INCREASE_FAILED;

if (this->_rear == this->_head) //empty queue
{
eleCount = 0;
}

if ((this->_rear+1)%this->_size == this->_head) //full queue
{
eleCount = this->_size - size;
}

for (i=this->_head ,j=0; j<eleCount; j++,i=(i+1)%this->_size)
{
temp[i] = this->_buffer[i%tempSize];
this->_rear = i;
}


delete []this->_buffer;
this->_buffer = temp;

return OPERATION_OK;
}

/*
This function is use to accept one element into the queue.
You must remember the element is a unsigned char array.
Len is the length of the data.
*/
ERR_NUMBER BytesQueue::inQueue(const uint_8 *data, int len)
{
uint_8 *temp;

if (pthread_mutex_trylock(&QueMutex))
{
printf("Try lock failed!\n");
return FAILED_LOCK;
}

if ((this->_rear+1)%this->_size == this->_head)
{
printf("The queue is full!\n");
pthread_mutex_unlock(&QueMutex);
return QUEUE_FULL;
}
if (!(temp = new uint_8[len + 4]))
{
pthread_mutex_unlock(&QueMutex);
return NO_AREA;
}


this->_buffer[this->_rear] = temp;
memcpy(this->_buffer[this->_rear], &len, 4);
memcpy(this->_buffer[this->_rear]+4, data, len);
this->_rear = (this->_rear + 1) % this->_size;
pthread_mutex_unlock(&QueMutex);

return OPERATION_OK;
}

/*
This function is use to set free one element from the queue.
You must get a buffer big enough to store the data before you call the function.
At the same time you need a more int &len to get the data length.
*/
ERR_NUMBER BytesQueue::outQueue(uint_8 *data, int &len)
{
if (pthread_mutex_trylock(&QueMutex))
{
printf("Try lock failed!\n");
return FAILED_LOCK;
}

if(!data)
{
pthread_mutex_unlock(&QueMutex);
return POINT_NULL;
}
if(this->_head == this->_rear)
{
printf("The queue is empty!\n");
pthread_mutex_unlock(&QueMutex);
return QUEUE_EMPTY;
}


memcpy((void*)&len, this->_buffer[this->_head], 4);
memcpy((void*)data, this->_buffer[this->_head]+4, len);
this->_head = (this->_head + 1) % this->_size;
pthread_mutex_unlock(&QueMutex);

return OPERATION_OK;
}
/*
This function is use to set free the data structure.

*/
void BytesQueue::destroy()
{
while (this->_head != this->_rear)
{
delete [](this->_buffer[this->_head]);
this->_head = (this->_head + 1) % this->_size;
}
delete [](this->_buffer);
this->_size = 0;
this->_buffer = NULL;
this->_head = 0;
this->_rear = 0;
}

/*
This fuction is use to test.
Show the result of the call fuction.
*/
void BytesQueue::errMessage(ERR_NUMBER err)
{
switch(err)
{
case OPERATION_OK:
printf(" push is ok!\n");
break;
case QUEUE_FULL:
printf(" push failed! The queue is full!!\n");
break;
case QUEUE_EMPTY:
printf(" pop failed! The queue is empty!!\n");
break;
case INCREASE_FAILED:
printf(" increase queue size failed! \n");
break;
default:
printf(" other things are wrong! \n");
break;

}
}

/*
This fuction is used to show the infomation of the current queue.
*/
void BytesQueue::showBytesQueue(BytesQueue& bq)
{
printf(" %s\n", "The info of the BytesQueue is :");
printf(" size : %d\n", bq._size);
printf(" head : %d\n", bq._head);
printf(" rear : %d\n", bq._rear);
printf(" buf addr : 0x%x\n", bq._buffer);
}



/*
using namespace NetworkProtocols;


//this is a good example to show how to use the data structure BytesQueue.

int main()
{
int len,i;
char ch;
ERR_NUMBER err;
uint_8 bufi[]={1,2,3,4,5,6,7,8,9,0};
uint_8 bufo[10];
BytesQueue bs;
bs.showBytesQueue(bs);

ch = getchar();
while(ch != 'q')
{
switch(ch)
{
case 'i':
err = bs.inQueue(bufi, 10);
bs.errMessage(err);
bs.showBytesQueue(bs);
ch = getchar();
break;
case 'o':
err = bs.outQueue(bufo, len);
bs.errMessage(err);
bs.showBytesQueue(bs);
ch = getchar();
break;
case 'e':
err = bs.increaseSize();
bs.errMessage(err);
bs.showBytesQueue(bs);
ch = getchar();
break;
case 'h':
printf("....................Help................\n");
printf(" i: go into an array into Queue.\n");
printf(" o: go out of an array out of the queue.\n");
printf(" e: enlarge the size of the queue.\n");
printf(" h: help\n");
printf(" q: quit the system.\n");
ch = getchar();
break;

default:
if (ch != '\n')
printf("...........Your input is wrong! Again!..............\n");

ch = getchar();
break;
}

}
bs.destroy();
bs.showBytesQueue(bs);
return 0;
}
*/
————————————————————————————————————————
//main.cpp
#include <stdio.h>
#include "BytesQueue.h"


typedef struct
{
int id;
BytesQueue *bq;
uint_8 *buf;
int len;
int delay;
}MyParameter;



pthread_t threads[5];
pthread_mutex_t QueMutex;
pthread_attr_t attr;


void *inQueue(void* pvar)
{
int i = 1;
MyParameter *para = (MyParameter*)pvar;


while( i )
{

printf("Thread inQue: %d is working! \n", para->id);
para->bq->inQueue(para->buf, para->len);
para->bq->showBytesQueue(*(para->bq));
//para->bs->push(para->buf,para->len);
//para->bs->showBytesStack(*(para->bs));

usleep(para->delay);
i ++;

}
pthread_exit(NULL);
}

void *outQueue(void* pvar)
{
int i = 1;
MyParameter *para = (MyParameter*)pvar;


while( i )
{

printf("-------------Thread outQue: %d is working! \n", para->id);
para->bq->outQueue(para->buf, para->len);
para->bq->showBytesQueue(*(para->bq));
//para->bs->pop(para->buf,para->len);
//para->bs->showBytesStack(*(para->bs));
usleep(para->delay);
i ++;

}
pthread_exit(NULL);
}

int main()
{

//IpStack::IpStack(int size) //size=10

uint_8 mybuf1[] = {
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x50,
0x58, 0x0D, 0x0D, 0x0D, 0x08, 0x00, 0x45, 0x00,
0x00, 0x34, 0x00, 0xF2, 0x00, 0x00, 0x40, 0x11,
0xB6, 0x65, 0xC0, 0xA8, 0x21, 0x0F, 0xC0, 0xA8,
0x21, 0x02, 0x04, 0x01, 0x00, 0x05, 0x00, 0x20,
0x60, 0x4c, 0x73, 0x66, 0x61, 0x73, 0x64, 0x66,
0x73, 0x61, 0x64, 0x66, 0x61, 0x73, 0x64, 0x66,
0x73, 0x64, 0x61, 0x66, 0x61, 0x73, 0x66, 0x73,
0x64, 0x66
};
uint_8 mybuf2[100];
int len;

BytesQueue bq(100);

MyParameter paras[4]={
{0,&bq,mybuf1,66,1000000},
{1,&bq,mybuf1,66,2000000},
{2,&bq,mybuf1,66,3000000},
{3,&bq,mybuf2,len,1000000}
};

//bq.showBytesQueue(bq);

pthread_attr_init(&attr);
pthread_create(&threads[0], &attr, inQueue, (void *)&paras[0]);
pthread_create(&threads[1], &attr, inQueue, (void *)&paras[1]);
pthread_create(&threads[2], &attr, inQueue, (void *)&paras[2]);
pthread_create(&threads[3], &attr, outQueue, (void *)&paras[3]);
//pthread_create(&threads[4], &attr, outQueue, (void *)&paras[3]);

for (int i=0; i<4; i++)
{
pthread_join(threads[i], NULL);
}
pthread_attr_destroy(&attr);



bq.destroy();
//bq.showBytesStack(bq);

printf("ok!!\n");

pthread_exit (NULL);

return 0;
} //end of main
____________________________________________________________________________________
zhangggdlt
2004.12.10
(完)




版权声明:CSDN是本Blog托管服务提供商。如本文牵涉版权问题,CSDN不承担相关责任,请版权拥有者直接与文章作者联系解决。

[点击此处收藏本文]
发表于2004年12月10日 11:33 AM

Feedback
# 回复:一个支持多线程同步循环队列的实现 2004-12-10 12:06 PM jzp_cn
good

# 回复:一个支持多线程同步循环队列的实现 2004-12-10 5:10 PM ilovevc
同样,你这个也不是线程安全的。

# 回复:一个支持多线程同步循环队列的实现 2004-12-12 10:41 AM zhangggdlt
现在该过了,这样对于inQueue 和 outQueue 实现了线程安全了,不会出现 ilovevc所说的现象了。我测试成功了,也不会发生死锁了。
再次感谢:ilovevc的提示。
我的blog:http://szhaitao.blog.hexun.com & http://www.hoolee.com/user/haitao
--以上均为泛泛之谈--
不尽牛人滚滚来,无边硬伤纷纷现 人在江湖(出来的),哪能不挨刀(总归是要的)
网络对话,歧义纷生;你以为明白了对方的话,其实呢?

您所在的IP暂时不能使用低版本的QQ,请到:http://im.qq.com/下载安装最新版的QQ,感谢您对QQ的支持和使用

相关信息:


欢迎光临本社区,您还没有登录,不能发贴子。请在 这里登录