Основы работы с потоками в языке Python

    Предисловие


    Данную статью я затеял написать после учащающихся вопросов как на форуме так и вопросов в icq на тему многопоточности в CPython. Проблема людей, которые их задают происходит, в основном, из незнания или непонимания основных принципов работы многопоточных приложений. По крайней мере, это относится к используемой мной модели многопоточности, которая носит название Thread Pool (Пул потоков). Часто встречаемой проблемой является и другое: люди не имеют элементарных навыков работы со стандартными модулями CPython. В статья я постараюсь привести примеры такого незнания, не останавливаясь на личностях, так как это по моему скромному мнению неважно. Исходя из условий, в которых пишется эта статья, то мы немного затронем и работу через proxy серверы (не путать с SOCKS).

    Детали


    Статья писалась в то время когда последними версиями CPython были: 2.6.2 для второй ветки и 3.1.1 для третьей ветки. В статье используются новоявленные в CPython 2.6 with statements, следовательно при желании ипользования этого кода в более ранних версиях придется переделывать код на своё усмотрение. В процессе написания данной статьи использовались только стандартные модули, доступные «из коробки». Также, исходя из того, что я являюсь не профессиональным программистом, а самоучкой, то прошу извинения у уважаемой аудитории за возможные неточности относительно трактования тех или иных понятий. Поэтому, приглашаю вас задавать вопросы, на которые я по возможности буду отвечать.

    Итак, приступим, фактически, то что я собрался описывать впервые было порекомендовано уважаемым lorien с python.su (правда в его примере Queue вообще в отдельном потоке обрабатывалось :)), не уверен что он автор продемонстрированного им концепта, но впервые я увидел это опубликованным именно от него, и являет собой даже скорее не Thread Pool, а Task Pool (хотя возможно я и не прав в трактовании сего термина).
    Что представляет собой многопоточное приложение? Это приложение, в котором определенное количество потоков выполняют некие задачи. Беда многих в том, что они не до конца улавливают то, что потоки действуют отдельно друг от друга до тех пор, пока активен главный поток. Лично я стараюсь писать таким образом, чтобы это им не мешало, но об этом позже. Также их проблемой является так называемый «индусский» код, который просто и бездумно откуда-то копируется, а программа доводится до уровня «лишь бы работало». Господа, усвойте раз и навсегда: если вы не понимаете, как работает тот или иной участок вашей программы, то перепишите его так, чтобы это было понятно ВАМ, если в будущем вы дорастете до понимания тех вещей, которые вы предполагали бездумно скопировать, то вам без проблем можно будет использовать этот код. Главным является именно ВАШЕ понимание того, как работает ваше творение.
    Затронем проблему отдельной работы потоков. Господа, взаимодействие потоков стоит продумывать до того как вы начинаете писать приложение, а не когда вы его уже написали. В принципе, если придерживаться некоторых правил работы с исходным кодом приложения, то переделывание программы из однопоточной в многопоточную происходит легко, безболезненно, и быстро.
    Касательно активности главного потока. Когда, как вам кажется, вы запускаете ОДИН поток, фактически работает уже ДВА потока. Нужно понимать, что количество потоков, активных в данный момент равняется количеству потоков, запущенных в данный момент вами +1 поток, в котором работает основное тело приложения. Лично я стараюсь писать таким образом, чтобы четко отделять основной поток от запущенных мной. Если этого не делать, то возможно преждевременное (как вам кажется) завершение работы приложения, хотя на самом деле приложение отработает именно так, как вы его написали.
    Вроде на словах понятно, теперь приступаем к практике. На практике в CPython есть такое понятние как GIL (Global Interpreter Lock). Под сим подразумевается глобальная блокировка интерпритатора в тот момент когда потоки вашего приложения обращаются к процессору. Фактически, в каждый отдельно взятый момент с процессором работает только один поток. В связи с этим максимальное количество потоков, которое вообще можно запустить в стандартном CPython колеблется в районе 350 штук.
    В качестве примера будет сделана попытка реализовать многопоточный парсер www.google.com. Как я уже написал выше, для работы будут использованы исключительно стандартные модули, для выполнения задачи понадобятся модули urllib2, urllib, queue, threading, re.

    По порядку:
    #==================<Имортирование необходимых модулей>==================
    import urllib2
    #Модуль для работы с протоколом HTTP, высокоуровневый
    import urllib
    #Модуль для работы с протоколом HTTP, более низкоуровневый чем urllib2, 
    #фактически из него необходима одна функция - urllib.urlquote
    from Queue import Queue
    #Модуль, который представляет собой "Pool", фактически это список, в 
    #котором на нужных местах вставлены замки таким образом, чтобы к нему 
    #одновременно мог обращаться только один поток
    import threading
    #Модуль для работы с потоками, из него понадобится только 
    #threading.active_count, threading.Thread, threading.Thread.start, 
    #threading.Rlock
    import re
    #Модуль для работы с регулярными выражениями, его использование выходит
    #за пределы статьи
    import time 
    #Модуль для работы со временем, из него нужна только функция sleep
    queue = Queue()
    #Обязательное присваивание, нужно делать именно так (т.е. импортировать
    #класс Queue из модуля Queue и инициализировать его)
    #==================</Имортирование необходимых модулей>=================

    #==============================<Настройки>==============================
    PROXY = "10.10.31.103:3128"
    #Во время написания статьи сижу за прокси-сервером, поэтому в статье 
    #затрагивается и этот вопрос, этой строкой обьявляется глобальная
    #переменная PROXY, в которой находится адрес прокси-сервера. Для работы 
    #напрямую необходимо указать значение None
    HEADERS = {"User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1",
               "Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1",
               "Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7",
               "Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1",
               "Accept-Encoding" : "identity, *;q=0",
               "Connection" : "Keep-Alive"}
    #Для того чтобы получить страницу с www.google.com НЕОБХОДИМО использовать
    #заголовки браузера, они представлены выше в ассоциативном массиве HEADERS, 
    #соответствуют реальным заголовкам браузера Opera с маленько модификацией, эти 
    #заголовки означают что клиент не может принимать zlib compressed data, т.е. 
    #сжатые данные - не хотел я заморачиваться еще и с разархивироанием страниц, тем 
    #более что не все сайты их сжимают...
    THREADS_COUNT = 10
    #В принципе это все настройки приложения, это-количество потоков
    DEEP = 30
    #Это - значение, которое отвечает за глубину страниц поиска, которые 
    #нужно просматривать, фактически же определяет собой количество ссылок, 
    #которые будут собраны сборщиком.
    ENCODING = "UTF-8"
    #Кодировка ваших файлов (для загрузки данных из файла с запросами и 
    #последующего их перевода в юникод)
    #==============================</Настройки>===================================

    LOCK = threading.RLock()
    # Вот тут то впервые и затрагивается модуль threading
    #создается обьект LOCK, который представляет собой класс threading.RLock из
    #модуля threading, это -простейший замок, который запрещает исполнение 
    #несколькими потоками участка кода который идет после вызова его метода 
    #acquire() Основным отличием threading.RLock от threading.Lock (тоже класс из 
    #модуля threading) является то, что каждый поток может обращаться к обьекту 
    #threading.RLock неограниченное количество раз, обьект threading.Lock может 
    #вызываться каждым потоком только единожды.



    Основным принципом для беспроблемной реализации многопоточности я считаю модульность кода. Не обязательно выносить используемые Вами функции в отдельные файлы или классы, достаточно чтобы хотя бы это были отдельные функции (Прошу простить за каламбур).

    Сейчас я выделю работу потока в отдельную функцию, пускай она будет представлять собой некое абстрактное понятие работы. На данном этапе пока что некоторые моменты будут непонятны, но позже все это выстроится в понятный алгоритм.

    def worker():
    # Обьявление функции worker, входных аргументов нет
        global queue
        #Здесь и далее я буду обьявлять функции из глобального пространства 
        #имен в локальном для лучшей читабельности кода, хотя в написании
        #софта такое делать строго не рекомендую (!)
        while True:
        #Запуск бесконечного цикла, в котором будет происходить работа
            try:
            #Обработка ошибок, блок try/except, когда обработается
            #ошибка QueueEmpty это значит, что список задач пуст, и поток 
            #должен завершить свою работу
                target_link =  queue.get_nowait() 
                #Эта строчка олицетворяет собой получение задачи потоком из
                #списка задач queue
            except Exception, error:
            #сам перехват ошибки
                return
                #Завершение работы функции
            parsed_data = get_and_parse_page(target_link)
            #Позже будет реализована функция, которая будет получать 
            #страницу и доставать из нее необходимые значения
            if parsed_data != "ERROR":
            #Проверка на то, была ли получена страница
                write_to_file(parsed_data)
                #Также будет реализована функция для записи собранных данных в файл
            else:
                queue.put(target_link)
                #Если страница не была получена, то забрасываем ее обратно в queue


    Главное, что нужно четко усвоить — это алгоритм работы самого потока, и что именно потоки должны обрабатывать независимо друг от друга. Итого, задачи потока очень просты — получить ссылку на страницу поиска, передать ее в функцию-обработчик, из которой вернутся ссылки на найденные сайты а также title этих сайтов, после записать ссылки и title в файл (все это будет находиться в parsed_data).

    По мере работы реализовываются задачи от простейшей к сложной, поэтому на этих этапах практически не затронуты вопросы многопоточности. Далее настала очередь реализации функции, которая будет отвечать за запись в файл.

    def write_to_file(parsed_data):
    #Обявление функции write_to_file, аргумент –массив данных для записи
        global LOCK
        global ENCODING
        LOCK.acquire()
        #"Накидывание замка", следующий далее участок кода может выполнятся
        #только одним потоком в один и тот же момент времени
        with open("parsed_data.txt""a"as out:
        #Используется with statement, открывается файл parsed_data.txt с
        #правами "a", что означает дозапись в конец файла, и присваиваевается
        #хэндлеру на файл имя out (я так привык)
            for site in parsed_data:
            #Проход циклом по всем элементам parsed data, имя активного в 
            #данный момент элемента будет site
                link, title = site[0], site[1]
                #Присваивание переменным link и title значений из кортежа site
                title = title.replace("<em>""").replace("</em>""").replace("<b>""").replace("</b>""")
                #.replace -это замена HTML-тэгов, которые проскакивают в title и совершено не нужны
                out.write(u"{link}|{title}\n".format(link=link, title=title).encode("cp1251"))
                #Производится сама запись в файл, используется оператор форматирования 
                #строк .format, в отличие от % он поддерживает именованные аргументы, чем я и не 
                #преминул воспользоваться, таким образом в файл пишется строка вида:
                #ссылка на сайт | title страницы\n -символ переноса строки(все это переводится
                #из юникода в cp1251)
        LOCK.release()
        #"Отпирание"  замка, в противном случае ни один из следующих 
        #потоков не сможет работать с этим участком кода. По-хорошему, тут тоже нужно 
        #сделать обработку ошибок, но это учебный пример, да и ошибка там может 
        #возникнуть (после добавки замка в этот участок кода) только если во время
        #работы приложения выставить атрибут “только чтение” для данного пользователя
        #относительно файла parsed_data.txt



    Далее идет реализация функции get_and_parse_page:
    def get_and_parse_page(target_link):
    #Обьявление функции, аргумент – ссылка на страницу
        global PROXY
        #Указывает на то, что в данной функции используется переменная PROXY
        #из глобального пространства имен
        global HEADERS
        #То же и для переменной Headers
        if PROXY is not None:
        #Если значение PROXY не равно None
            proxy_handler = urllib2.ProxyHandler( { "http"""+PROXY+"/" } )
            #Создается Прокси-Хэндлер с указанным прокси
            opener = urllib2.build_opener(proxy_handler)
            #Далее создается opener c созданным ранее Прокси-Хэндлером
            urllib2.install_opener(opener)
            #И наконец-то он устанавливается, теперь нет необходимости в 
            #шаманствах, все запросы в которых будет использоваться urllib2 
            #(в пределах этой функции будут направляться через указанный ранее 
            #PROXY)
        page_request = urllib2.Request(url=target_link, headers=HEADERS)
        #Создается обьект Request, который олицетворяет собой Request instance,
        #фактически это GET запрос к серверу с указанными параметрами, мне 
        #же необходимо использовать заголовки...
        try:
        #Обработка всех возможных ошибок, возникающих во время получения
        #страницы, это нехорошо, но лучше чем полное отсутствие обработки
            page = urllib2.urlopen(url=page_request).read().decode("UTF-8""replace")
            #Переменной page присваиваем прочитанное значение страницы запроса, переведенное 
            #в unicode из кодировки UTF-8 (кодировка, используемая на www.google.com) (в 
            #Python 2.6 unicode -это отдельный тип данных(!))
        except Exception ,error:
        #Сам перехват ошибки и сохранение ее значения в переменную error
            print str(error)
            #Вывод ошибки в консоль, прведварительно переведя ее в строку 
            #(просто на всякий случай)
            return "ERROR"
            #Возврат из функции в том случае, если во время работы возникла ошибка
        harvested_data = re.findall(r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''', page)
        #Сбор со страницы поиска ссылок и title найденных страниц
        #Очистка данных от результатов поиска по блогам, картинкам и др. сервисам гугла
        for data in harvested_data:
        #Для каждого элемента массива harvested_data присвоить ему имя data
            if data[0].startswith("/"):
            #Если нулевой элемент массива data(ссылка) начинается с символа /
                harvested_data.remove(data)
                #Удаляем его из массива harvested_data
            if ".google.com" in data[0]:
            #Если нулевой элемент массива data(ссылка) имеет в себе .google.com
                harvested_data.remove(data)
                #Также удаляем его из массива harvested_data
        return harvested_data
        #Возвращаем собранные значения из функции



    Наконец-то дошла очередь до реализации основного тела приложения:
    def main():
    #Обявление функции, входных аргментов нет
        print "STARTED"
        #Вывод в консоль о начале процесса
        global THREADS_COUNT
        global DEEP
        global ENCODING
        #Обьявляние о том что эти переменные будут использоваться
        #из глобального пространства имен
        with open("requests.txt"as requests:
        #Открываем файл requests в котором находятся запросы к поисковику
             for request in requests:
             #На данном файлхэндлере доступен итератор, поэтому можно 
             #пройтись по файлу циклом, без загрузки файл в оперативку, но это 
             #тоже не важно, я все равно его туда загружу:)
                    request = request.translate(None"\r\n").decode(ENCODING, "replace")
                    #Очистка запроса от символов конца строки а также их 
                    #перевод в юникод (с заменой конфликтных символов)
                    empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
                    #Это пустой адрес страницы поиска, отформатирован
                    for i in xrange(0, DEEP, 10):
                    #Проход итератором по диапазону #чисел от 0 до DEEP, 
                    #который представляет собой максимальную глубину поиска с 
                    #шагом в 10, т.е. получаем из этого диапазона только 
                    #числа десятков, т.е. 10, 20, 30 (как идет поиск у гугла)
                         queue.put(empty_link.format(request=request.encode("UTF-8"), N=i))
                         #Добавление в очередь каждой сгенерированной ссылки
                         #и перевод её в кодировку UTF-8 (для гугла)
        for _ in xrange(THREADS_COUNT):
        #Проход циклом по диапазону чисел количества потоков
            thread_ = threading.Thread(target=worker)
            #Создается поток, target-имя функции, которая являет собой 
            #участок кода, выполняемый многопоточно
            thread_.start()
            #Вызывается метод start() , таким образом поток запускается
        while threading.active_count() >1:
        #До тех пор, пока количество активных потоков больше 1 (значит, 
        #запущенные потоки продолжают работу)
            time.sleep(1)
            #Основной поток засыпает на 1 секунду
        print "FINISHED"
        #Вывод в консоль о завершении работы приложения



    В итоге получаем нормально работающий многопоточный парсер. Естественно с многими минусами, но красиво написанное я задолбусь комментировать.

    Код:
    Эта статья + исходники: sendspace.com/file/mw0pac
    Код с русскими коментариями: dumpz.org/15202
    Код с украинскими коментариями: dumpz.org/15201

    P.S. Да я знаю, что кому-то этот пример покажется нерациональным использованием Queue (привет, cr0w). Но вот обработку ошибок проще всего делать именно, используя его.
    P.P.S. Материал не претендует на непогрешимость. Естественно, тут 100% быдлокод, никакого понимания мной того что я описываю, непонятки с терминами, я-быдлокодер и т.д. и т.п. НО тут есть то, чего вам не пересрать — оно РАБОТАЕТ, причем работает именно так, как от него ожидается, код понятен и откомментирован так, что будет понятно даже младенцу.Надеюсь, что оно хоть кому-то поможет…

    © login999
    uasc.org.ua
    Метки:
    Поделиться публикацией
    Реклама помогает поддерживать и развивать наши сервисы

    Подробнее
    Реклама
    Комментарии 39
    • 0
      Перенесите в блог по питону, когда кармы наберете (а вы наберете :) ). Хороший материал.
      • 0
        Обязательно перенесу. Спасибо! =)
        • 0
          топик перенес в блог «Язык программирования Python»!
          Спасибо за карму! =)
      • 0
        Как раз искал. Спасибо за статью.

        P.S.: Заметил странность — слово «многопоточность» в программировании встречается столько же, сколько «позиционирование» в маркетинге.
        • +3
          Насчет полезности мне трудно уже судить, поэтом побрюзжу:

          0. Зачем использовать print для вывода отладочной информации, когда есть простой и удобный logging;
          1. Threading это всеже нити. Для работы с многопоточностью есть multiprocessing;
          2. Для практического применения прокси на тредах не годится (или только при мизерных нагрузках).
          • 0
            0) print работает исключительно как индикатор
            1) Под Threading подразумевал именно нити (по поводу путаницы с терминологией я предупреждал)
            2) Что именно подразумевается под практическим применением?
            • +2
              >>> Что именно подразумевается под практическим применением?

              Много запросов. Для домашнего пользования зачастую будет проще настроить тотже squid.
            • 0
              1) Threads — это потоки, в некоторых переводах «нити». Мне много больше нравятся потоки, так как они хоть что-то означают. Путаницы с I\O потоками (те что streams) тут совсем нету. Multiprocessting — это чуть другое. Об этом можно почитать в POSIX модели процессов и потоков.
              А вообще можно переводить Threads как угодно, но учтите, что очень много программистов никогда и не слышали о нитях.
              З.Ы. А как перевести multithreading?
            • 0
              А что вы скажете про Stackless?
              • 0
                Я вам скажу за Stackless — в стаклес реализована концепция сопрограмм, а это несколько другое.
              • –3
                ааа, спасибо огромное! как раз задумался над таким парсингом. завтра перепишу парочку своих скриптов
                • 0
                  1. В 2.5 питоне тоже есть with statements: from __future__ import with_statement
                  2. Не хватает дисклеймера со словами «естественно, я понимаю, что писать многопоточный парсер google.com — затея не из лучших»

                  Кстати, работу с локами тоже проще завернуть в with ;)
                  • +1
                    Мне одному кажется что вы переборщили с комментариями? По мне так лучше пусть было бы
                    красиво написанное
                    и откомментированное в несколько раз менее объемно.
                    • 0
                      да, Вы правы, комментариев много, но ни по одной строчке кода не возникнет никаких вопросов (добивался именно этого) =)
                    • +2
                      Надо было добавить ещё код с молдавскими комментариями ))
                      • 0
                        если Вы обратите внимание на маааленькую ссылочку в самом низу статьи и перейдёте туда, то увидите что там принято писать по-украински =)
                      • 0
                        Есть вопрос. Не уверен, что автор сможет дать ответ, но может кто-то:
                        Правильно ли я понял, RLock & Lock это аналоги recursive mutex & non-recursive mutex из POSIX thread model?
                        • 0
                          судя по комментам в коде это именно оно и есть.
                        • +2
                          зачем писать global? в вашем случае это вообще ни к чему
                          • 0
                            global используется для того, чтобы не возникло никаких вопросов по поводу того что откуда берется =)
                            • 0
                              да лучше бы возникли вопросы у тех, кто не знает что и как, это же не урок по основам питона
                          • +1
                            Если претендуете на академичность кода, то вот это пятна на нём:
                            try:
                            target_link = queue.get_nowait()
                            except Exception, error:
                            return

                            for _ in xrange(THREADS_COUNT):
                            • 0
                              В первом абзаце понятно — антипаттерн «Подгузник».

                              А во втором что?
                          • +5
                            Да, вы правы. У вас 100% говонокод.
                            Что будет с вашими потоками, если отвалиться сеть, или гугл начнет отдавать 404? Они повиснут, т.к очередь никогда не отчиститься.
                            Что будет, если кончится место на диске? Потоки зависнут, т.к после взятия лока вылетит исключение на open(), и лок уже не будет освобожден.

                            То, что код работает и в нем много комментариев — не причина демонстрировать его, называя статью «основы работы с потоками».

                            Хороший пример работы с потоками — github.com/svetlyak40wt/couchdb-performance-tests/blob/master/couchdb_bulk_perf.py
                            • 0
                              Спасибо за ссылку, давно я искал чтоб было наглядно с очередью, свою писал. жесть.
                              • 0
                                о да, а если выключится компьютер, планета остановится, или того хуже — у меня закончится кофе! особенно интересно насчет окончания места на диске — вам не кажется, что в таком случае то уже и смысла нет в его дальнейшей работе как таковой (не, это не уменьшает моей быдлокодерской ошибки, но это факт). П.С. приведенный вами пример — это пример того, как я делать не буду.
                                • 0
                                  особенно интересно насчет окончания места на диске — вам не кажется, что в таком случае то уже и смысла нет в его дальнейшей работе как таковой

                                  конечно, в случае ошибки лучше наглухо зависнуть никому не сказать о причине

                                  о да, а если выключится компьютер, планета остановится, или того хуже — у меня закончится кофе!

                                  тогда и пишите во введении, что у вас пост про код с кучей концептуальных!!! багов (дедлок, это как раз концептуальный баг в многопоточной программе) а не про «понимания основных принципов работы многопоточных приложений»
                              • +3
                                Это же жесть:
                                return «ERROR»
                                if parsed_data != «ERROR»:

                                ___
                                Чем Вам BOOL помешал? или брослии бы исключение

                                Файлы и потоки это жесть номер два, я понимаю что это учебка. но изначально нужно закладывать правлиьное видение потоков, если у вас 1000 из 20000 и более потоянно.

                                по опыту скажу, что лучше «get_and_parse_page» делать не в потоке, у вас просто 100 проц. вермени будет сжираться от регекспа, т.е. я хотел сказать, что страницы нужно напарсить в потоках, тут все правльино, и дальше сделать генерарор, который будет в цикле, так вот по выходу из генератора все что накачали ( ну там 100 страниц допустим), уже делать регексп

                                вы соверенно не обрабатываете такие вещи как недоступность прокси, 403 от гугла и т.д. при такой схеме вас на 3-4 поток забанят ( пауза в 1 сек совершено бесполезна!) )( понимаю, что это пример, но вы хотябы в коментах про это отписали бы )
                                • +1
                                  Мне вот отмазка, что это мол пример, не канает. Зачем мне пример, если там все через задницу? Даже я, и то вижу, что это плохой код. А пафос, с которым статья преподнесена, усугубляет во сто крат.
                                • 0
                                  мне всегда нравились фразы типа: бла-бла, я здесь сделаю такую штуку, но вы такую штуку не делайте, я этого не рекомендую! :) зачем самому делать то что не рекомендуете?

                                  а за топик спасибо, как раз сегодня хотел посмотреть что-то по работе с потоками
                                  • 0
                                    Если вы про мой ответ, то я не рекомендовал делать в потоках регехп, так как он бесполезен в потоках, только нагружает проц, да еще если помножить на потоки, то получаеться как раз медленней, чем в прямой цикл поставить и обработать
                                    • 0
                                      А если вообще нафиг послать потоки в чтении страниц и просто асинхронно через epoll их тащить? Будет же вообще комильфо…
                                      • 0
                                        пример бы дали.
                                        • 0
                                          Попробую написать на неделе что-нибудь такое.

                                          Я как раз с месяц назад писал статью про потроха асинхронных серверов. Думаю, тот же принцип можно было бы здесь использовать.

                                          Если интересно, из питона интерфейс к асинхронным сокетам можно получить через модуль select.
                                      • 0
                                        не я об этом говорил:
                                        global queue
                                        #Здесь и далее я буду обьявлять функции из глобального пространства
                                        #имен в локальном для лучшей читабельности кода, хотя в написании
                                        #софта такое делать строго не рекомендую (!)
                                    • 0
                                      Не феншуйно читать комментарии, написанные под строкой, к которой они относятся. Обычно они сверху или в той же строке (pep8) и к этому все привыкли.

                                      • 0
                                        Как-то не очень с исключениями. Во-первых, except и сразу return может стоить вам многих часов, разбираясь почему «оно» не работает как как надо. Во-вторых, при работе с потоками (thread'ам), важно помнить две вещи — поток должен сам перехватывать свои собственные исключения и acquire/release должны быть обернуты try/finally или lock подсовывать with'у. Если write_to_file бросит исключение при открытии файла, будет и дедлок и необработанное иключение. Кстати print для иключения далеко не самый лучший индикатор, logging.exception поинформативней будет.

                                        И еще, ENCODING то используется, то не используется.
                                        • 0
                                          Добавлю свои 5 копеек: некто не запрещает разделить задачу еще больше — сделать 3 очереди и 3 типа потоков (скачка, парсинг, сохранение). Это позволит начать обработку входных данных не дожидаясь окончательной их загрузки (если потоки запустить до начала загрузки данных в очередь) и сильнее разнести части кода (например для увеличения кол-ва разработчиков). Дальше можно нитки демонизировать, что позволит оставить их болтаться до тех пор пока не закончится основной поток. Забирать данные в этом случае можно и проще и надежнее:
                                          в нитке остаётся только бесконечно крутить:
                                          while True:
                                          # забираем данные из очереди дожидаясь пока они там будут
                                          data = queueN.get()
                                          # делаем свое грязно дело
                                          do_some_work(data)
                                          # помечаем сделанную работу в очереди
                                          queue.task_done()

                                          в основной поток будет выглядеть примерно вот так:
                                          queue1 = Queue()
                                          queue2 = Queue()
                                          queue3 = Queue()

                                          # тип 1
                                          for _ in range(NUMBER):
                                          thread_ = threading.Thread(target=worker)
                                          thread_.setDaemon(True) # для питона 2.5 (в 2.6+ есть обратносовместимый метод)
                                          thread_.start()
                                          # тип 2
                                          for _ in range(NUMBER):
                                          thread_ = threading.Thread(target=worker2)
                                          thread_.setDaemon(True)
                                          thread_.start()

                                          # тип 3
                                          for _ in range(NUMBER):
                                          thread_ = threading.Thread(target=worker3)
                                          thread_.setDaemon(True)
                                          thread_.start()

                                          # тут запихиваем в очереди данные, и по мере того как очередь будет наполнятся она начнет обрабатываться

                                          # ждем когда закончатся все очереди
                                          queue1.join()
                                          queue2.join()
                                          queue3.join()

                                          Все, после того как третья очередь завершится основной поток выйдет и грохнет все висящие нитки.

                                          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.