1. C++ / Говнокод #14156

    +13

    1. 01
    2. 02
    3. 03
    4. 04
    5. 05
    6. 06
    7. 07
    8. 08
    9. 09
    10. 10
    11. 11
    12. 12
    13. 13
    14. 14
    15. 15
    16. 16
    17. 17
    18. 18
    19. 19
    20. 20
    21. 21
    22. 22
    23. 23
    24. 24
    25. 25
    26. 26
    27. 27
    28. 28
    29. 29
    30. 30
    31. 31
    32. 32
    33. 33
    34. 34
    35. 35
    36. 36
    37. 37
    38. 38
    39. 39
    40. 40
    41. 41
    42. 42
    43. 43
    44. 44
    45. 45
    46. 46
    47. 47
    typedef std::queue<Msg> Queue;    
        
    
    struct SharedQueue
    {
        private:
            Queue m_queue;
            boost::mutex m_mux;
            boost::condition_variable m_condvar;    
        private:
            struct is_empty
            {
                Queue& queue;
                is_empty( Queue& q):
                    queue(q)
                {
                }
    
                bool operator()() const
                {
                    return !queue.empty();
                }
            };
        public:
            void push(const Msg& msg)
            {
                boost::mutex::scoped_lock lock(m_mux);
                m_queue.push( msg);
                m_condvar.notify_one();
            }
    
            bool try_pop( Msg& msg, Kind kind)
            {
                boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds( 30000);
                boost::mutex::scoped_lock lock( m_mux);
                if ( m_condvar.timed_wait( lock, timeout, is_empty( m_queue)))
                {
                    if( !m_queue.empty() && m_queue.front().kind == kind)
                    {
                        msg = m_queue.front();
                        m_queue.pop();
                        return true;
                    }
                }
                return false;
            }
    };

    Это ж пипец, дорогие товарищи...

    Запостил: blackhearted, 29 Ноября 2013

    Комментарии (37) RSS

    • Было бы очень неплохо, если бы тред посетил Морган Фриман.
      Ну и еще, хотелось бы узнать у знающих людей: а зачем писать кю, неужели каких-нибудь MPI / OpenMP не достаточно / нету уже готовых?
      Ответить
      • > неужели каких-нибудь MPI / OpenMP не достаточно / нету уже готовых?
        no.

        Правда, для желающих есть boost.lockfree.
        Ответить
    • > Это ж пипец, дорогие товарищи...
      Где пипец, ткни меня в него носом плиз? Самая обыкновенная блокирующая очередь на mutex и wait condition. Медленная (хотя вроде как обгонит хвалёные локфри при высокой конкуренции), тупая, зато наглядная и пуленепробиваемая.

      Ну разве что предикат сдуру назвали is_empty вместо is_not_empty; в строке 29 notify кидают всегда, а не только при вставке в пустую очередь; ну и в строке 38 лишняя проверка на пустоту.

      Меня вот реально смущает только m_queue.front().kind == kind в 38 строке. Видимо каждый worker достает сообщения нужного ему типа? Почему бы тогда не сделать по одной очереди на каждый тип?
      Ответить
      • про 38 не согласен ;)
        есть же spurious wakeup у condition variables
        Ответить
        • Там все нужные проверки уже есть в данной перегрузке timed_wait(), которая эквивалентна вот такому коду:
          while(!pred())
          {
              if(!timed_wait(lock,abs_time))
              {
                  return pred();
              }
          }
          return true;
          Пруф: http://www.boost.org/doc/libs/1_41_0/doc/html/thread/synchronization.html

          Так что проверка !m_queue.empty() в 38 строке лишняя.
          Ответить
        • А вот в чем я тупанул - так это в том, что если мы будем вызывать notify_one() только при вставке в пустую очередь, то, походу, стоит заменить его на notify_all().

          Иначе, если producer вставляет несколько записей подряд, notify_one() разбудит только одного consumer'а, и ему достанутся все вставленные записи, что совсем не айс. А если этот consumer умрет, не добравшись до конца очереди, то ее заклинит навсегда.
          Ответить
    • учитывая коммент бормана о бустовом timed_wait(), говно я вижу тут только в `m_queue.front().kind == kind`.

      тут надо было (например) N очередей делать, по штуке на тип. тогда бы и `notify_one()` (если имя не кривое) работал.
      Ответить
      • > тогда бы и `notify_one()` (если имя не кривое) работал.
        При условии, что на каждый тип ровно 1 consumer. Если несколько - то опять те же самые проблемы ;) И если у нас один consumer, то между notify_one() и notify_all() разницы никакой. Поэтому от notify_all() хуже точно не станет...
        Ответить
        • "При условии, что на каждый тип ровно 1 consumer. Если несколько - то опять те же самые проблемы ;)"

          Зачем будить всех консумеров если в очереди только одно сообщение?
          Ответить
          • Ну да, я туплю. В коде из топика нет проверки перед notify_one(), и оно будит один спящий поток на каждое сообщение. Поэтому все нормально.

            P.S. А вот m_queue.front().kind == kind и notify_one() несовместимы чуть более чем полностью ;)
            Ответить
            • > P.S. А вот m_queue.front().kind == kind и notify_one() несовместимы чуть более чем полностью ;)
              Хотя оно и с notify_all() несовместимо... Именно в этой проверке kind'а и таится страшное говно ;)
              Ответить
              • другими словами: "все равно говно, но за вас я это фиксить не буду."
                Ответить
                • Ну а че фиксить. Фикс уже дважды (мной и вами) был описан выше: выпилить нахрен kind и разбить очередь на несколько :)

                  P.S. А очередь на двух семафорах вроде же работает быстрее, чем на мутексе и wait condition?
                  Ответить
                  • не изголялся. учитывая семантику wait condition'ов, меня это сильно не удивит.

                    я на последнем проекте делал thread pool c MRU - вот это конкретно помогало.

                    к слову. я уже и забыл про еще одно говно: Msg копируется в/из очереди.
                    Ответить
                    • Каким боком MRU к thread pool можно использовать?
                      Ответить
                      • из N потоков, следующая задача отдается потоку который только что закончил последнюю задачу (== most recently used thread).
                        Ответить
                        • А что это даёт? Стек горячий?
                          Ответить
                          • код, стэк и данные в кэше проца. привязка потока к процу редко меняется, только если проц какой простаивает и/или перегружен.

                            в случае FIFO (что по моим наблюдениям есть поведение по умолчанию с типичной реализацией очередей как сверху) то прога как бы по всем подряд процам скачет. а если пытаться трогать как можно меньше процов (== это как бы и была моя начальная идея), типа как бы LIFO сэмулировать заставить, то вероятность что код/данные еще в стэке намного выше.

                            (FIFO/LIFO я имею в виду с точки зрения тред-пула, и применяю термины к списку idle потоков.)

                            на наших тестах как минимум 5% производительности добавило. (на тестах со средней и низкой загрузками, загрузка CPU ушла с типичной ~20% на >5%.)
                            Ответить
                            • Зачем FIFO? Почему эта типичная? Это как-то не логично.

                              Да же LIFO не настолько логично.

                              Нужно брать последний завершившийся поток, выполнявший задание для нашего ядра среди списка потоков, назначенных на другое ядро.
                              Ответить
                              • "Нужно брать последний завершившийся поток, выполнявший задание для нашего ядра среди списка потоков, назначенных на другое ядро."

                                Удачи это портабельно реализовывать.

                                На уровне ядра таким еще можно попытатся поизголятся, но на уровне юзвер-спэйс, выше скедулера не прыгнешь.
                                Ответить
                          • По моему в принципе сложно сделать как-то иначе. Есть std::forward_list потоков в реализации пула потоков. Поток извлекается когда запускается и кладется обратно когда останавливается. Можно конечно когда возвращаешь поток класть в конец списка, но для этого нужно быть знатным извращенцем и пессимизатором.
                            Ответить
                  • А как сделать на двух семафорах?
                    Ответить
                    • Да я тупанул походу. Там один хер еще мутекс:
                      // push
                      used.aquire();
                      synchronized (queue) {
                          queue.enqueue(item);
                      }
                      free.release();
                      // pop
                      free.aquire();
                      synchronized (queue) {
                          result = queue.dequeue(item);
                      }
                      used.release();
                      Ответить
                      • > Да я тупанул походу. Там один хер еще мутекс:

                        и это опять не работает :)

                        традиционная реализации которые я видел как правило пользовались cyclic buffer, вместо обычной очереди. там тогда обычных атомиков для индексов хватает.

                        семафоры нужны преимущественно для того что организовать wake up для получателей.

                        или что именно у тебя не работает: push()ей может быть только N, где N есть начальное число семафора `used`.
                        Ответить
                        • > push()ей может быть только N, где N есть начальное число семафора `used`.
                          Ограничение на длину очереди это фича ;) Часто лучше кинуть сообщение о перегрузке, чем бесконечно растягивать очередь.

                          > как правило пользовались cyclic buffer
                          Ну да, согласен, с ним лучше. Тогда очередь вообще будет работать как wait-free пока не опустеет или не переполнится.
                          Ответить
                          • > это фича ;)

                            начинается... :)
                            Ответить
                            • Очереди с неограниченной длиной между потоками не нужны. Два потока запускаются и работают и если поток приемник работает медленнее потока источника - приложение обязательно упадет. Это лишь вопрос времени.
                              Ответить
                              • я просто прикалывался. :)

                                я это уже лет пять пытаюсь в моей конторе толкать, но у народа просто панический страх на ограничения.
                                Ответить
    • показать все, что скрыто-
      Ответить

    Добавить комментарий