- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
- 09
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
typedef std::queue<Msg> Queue;
struct SharedQueue
{
private:
Queue m_queue;
boost::mutex m_mux;
boost::condition_variable m_condvar;
private:
struct is_empty
{
Queue& queue;
is_empty( Queue& q):
queue(q)
{
}
bool operator()() const
{
return !queue.empty();
}
};
public:
void push(const Msg& msg)
{
boost::mutex::scoped_lock lock(m_mux);
m_queue.push( msg);
m_condvar.notify_one();
}
bool try_pop( Msg& msg, Kind kind)
{
boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds( 30000);
boost::mutex::scoped_lock lock( m_mux);
if ( m_condvar.timed_wait( lock, timeout, is_empty( m_queue)))
{
if( !m_queue.empty() && m_queue.front().kind == kind)
{
msg = m_queue.front();
m_queue.pop();
return true;
}
}
return false;
}
};
Ну и еще, хотелось бы узнать у знающих людей: а зачем писать кю, неужели каких-нибудь MPI / OpenMP не достаточно / нету уже готовых?
no.
Правда, для желающих есть boost.lockfree.
Где пипец, ткни меня в него носом плиз? Самая обыкновенная блокирующая очередь на mutex и wait condition. Медленная (хотя вроде как обгонит хвалёные локфри при высокой конкуренции), тупая, зато наглядная и пуленепробиваемая.
Ну разве что предикат сдуру назвали is_empty вместо is_not_empty; в строке 29 notify кидают всегда, а не только при вставке в пустую очередь; ну и в строке 38 лишняя проверка на пустоту.
Меня вот реально смущает только m_queue.front().kind == kind в 38 строке. Видимо каждый worker достает сообщения нужного ему типа? Почему бы тогда не сделать по одной очереди на каждый тип?
есть же spurious wakeup у condition variables
Так что проверка !m_queue.empty() в 38 строке лишняя. Пруф:
Иначе, если producer вставляет несколько записей подряд, notify_one() разбудит только одного consumer'а, и ему достанутся все вставленные записи, что совсем не айс. А если этот consumer умрет, не добравшись до конца очереди, то ее заклинит навсегда.
тут надо было (например) N очередей делать, по штуке на тип. тогда бы и `notify_one()` (если имя не кривое) работал.
При условии, что на каждый тип ровно 1 consumer. Если несколько - то опять те же самые проблемы ;) И если у нас один consumer, то между notify_one() и notify_all() разницы никакой. Поэтому от notify_all() хуже точно не станет...
Зачем будить всех консумеров если в очереди только одно сообщение?
P.S. А вот m_queue.front().kind == kind и notify_one() несовместимы чуть более чем полностью ;)
Хотя оно и с notify_all() несовместимо... Именно в этой проверке kind'а и таится страшное говно ;)
P.S. А очередь на двух семафорах вроде же работает быстрее, чем на мутексе и wait condition?
я на последнем проекте делал thread pool c MRU - вот это конкретно помогало.
к слову. я уже и забыл про еще одно говно: Msg копируется в/из очереди.
в случае FIFO (что по моим наблюдениям есть поведение по умолчанию с типичной реализацией очередей как сверху) то прога как бы по всем подряд процам скачет. а если пытаться трогать как можно меньше процов (== это как бы и была моя начальная идея), типа как бы LIFO сэмулировать заставить, то вероятность что код/данные еще в стэке намного выше.
(FIFO/LIFO я имею в виду с точки зрения тред-пула, и применяю термины к списку idle потоков.)
на наших тестах как минимум 5% производительности добавило. (на тестах со средней и низкой загрузками, загрузка CPU ушла с типичной ~20% на >5%.)
Да же LIFO не настолько логично.
Нужно брать последний завершившийся поток, выполнявший задание для нашего ядра среди списка потоков, назначенных на другое ядро.
Удачи это портабельно реализовывать.
На уровне ядра таким еще можно попытатся поизголятся, но на уровне юзвер-спэйс, выше скедулера не прыгнешь.
и это опять не работает :)
традиционная реализации которые я видел как правило пользовались cyclic buffer, вместо обычной очереди. там тогда обычных атомиков для индексов хватает.
семафоры нужны преимущественно для того что организовать wake up для получателей.
или что именно у тебя не работает: push()ей может быть только N, где N есть начальное число семафора `used`.
Ограничение на длину очереди это фича ;) Часто лучше кинуть сообщение о перегрузке, чем бесконечно растягивать очередь.
> как правило пользовались cyclic buffer
Ну да, согласен, с ним лучше. Тогда очередь вообще будет работать как wait-free пока не опустеет или не переполнится.
начинается... :)
я это уже лет пять пытаюсь в моей конторе толкать, но у народа просто панический страх на ограничения.
на воре шапка горит, йоба.
С этими 6ю человеками ;) Я один из анонимных ебней, минуснувших эти два коммента.