Cześć,
zgodnie z poprzednim wpisem – staram się pisać częściej, a skoro karp już zjedzony i popity kompotem z suszu (obu nie lubię ;) ) to można by coś napisać. Od jakiegoś czasu przymierzam się do napisania o gevencie – tj. implementacji pętli zdarzeń dla Pythona. I ten czas – nastał właśnie teraz.
Zacznijmy więc może od tego czym jest pętla zdarzeń. A może nie, zacznijmy od tego jak możemy wykorzystać wątki do zapewnienia przetwarzania wielowątkowego.
Działa więc to tak że jak przychodzi jakiś request to cała jego obsługa jest robiona w nowym wątku (lub w wątku leżącym w puli wątków) – każdy wiec taki request jest całkowicie niezależny od innego – brzmi to całkiem nieźle. Aczkolwiek może też powodować problemy – ponieważ dla każdego połączenia tworzony jest nowy wątek, co angażuje zasoby systemowe w jego stworzenie jak i w „administrację” nim – przełączanie itp. Co więcej, jeśli wątek natrafi na jakieś żądanie I/O tj. robienie selecta do bazy czy czytanie z pliku – wątek ten będzie leżał nieużywany – co powoduje marnowanie zasobów. Przy wątkach dochodzi jeszcze problem ich synchronizacji – tj takiego manewrowania nimi żeby nie zaszkodzić sobie nawzajem przy dostępnie do części wspólnych a więc – tworzenie sekcji krytycznych czy odpowiednie poziomy transakcji na bazie danych.
Jako alternatywę mamy właśnie pętle zdarzeń, której architektura wygląda tak:
wchodząc w głębie tego schematu – pętla zdarzeń pracuje na pojedynczym wątku, przez co w jednym czasie może robić tylko jedną rzecz. Żongluje ona zadaniami do wykonania, biorąc pod uwagę priorytety zadań. Jeśli zadanie zostanie odpalone przez pętle zdarzeń, ale zacznie np. czytać z bazy danych (mieć operację I/O) to takie zadanie przestanie blokować pętle i pętla zacznie wykonywać inne zadanie z kolejki – a zadanie z operacją I/O wróci do kolejki gdy ta operacja się zakończy. A więc jak widać, tutaj jednostką wykonawczą nie jest wątek. Wszystko dzieje się w jednym wątku, przez co nie ma problemów z synchronizacją, nie nakładamy na system operacyjny ciężaru obsługi kolejnych wątków a wszystko jest nadal „współbieżne”.
Modeli użycia pętli zdarzeń jest kilka – my zajmiemy się biblioteką gevent i modelem aktorów. Gevent w najprostszym użyciu wygląda tak:
1 2 3 4 5 6 7 | >>> import gevent >>> from gevent import socket >>> urls = ['www.google.com', 'www.example.com', 'www.python.org'] >>> jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] >>> gevent.joinall(jobs, timeout=2) >>> [job.value for job in jobs] ['74.125.79.106', '208.77.188.166', '82.94.164.162'] |
Odpalamy tu tyle greenletów (tak nazywa się jednostka wykonawcza w gevencie – tłumaczy się to jako „lekkie wątki”) ile jest elementów tablicy urls – i odpalamy wszystkie, czekając na wszystkie, max 2 sekundy. Każdy nasz greenlet odpala funkcję gethostbyname dla zadanego adresu url.
My się pobawimy troszkę inaczej, tak jak pisałem wcześniej – zaimplementujemy wzorzec aktorów.
Wzorzec ten tworzy abstrakcję – Aktora. Jest to jednostka wykonawcza której stan jest modyfikowany tylko i wyłączenie przez nią samą. Komunikuje się innymi aktorami poprzez wiadomości – a wiec aktor może odbierać wiadomości i wysyłać je. No i rzecz jasna – coś z nimi robić. Skrzynka odbiorcza aktora jest kolejką – a więc jeden aktor może w jednym momencie przetwarzać jedną wiadomość – reszta jest kolejkowana.
Zbudujemy system przetwarzania obrazów oparty o model aktorów. Będziemy mieli dwóch specjalnych aktorów, tj. aktora który stanowi wejście do systemu i taki który stanowi wyjście z systemu. Pomiędzy nimi będzie n aktorów którzy będą dokładać swoje cegiełki do przetwarzanego obrazu. Więc pomysł działania jest prosty:
Aktor Startowy (odbiera jaki plik przetworzyć i jakie efekty na niego nałożyć)
.. przetwarzanie
.. przetwarzanie
Aktor końcowy (zapisuje obraz z nałożonymi wcześniej efektami do pliku).
Zacznijmy więc od napisania klasy po której będą dziedziczyli aktorzy:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | import gevent from gevent.queue import Queue from gevent.local import local from exceptions import NotImplementedError from manager.manager import manager class AbstractActor(gevent.Greenlet): def __init__(self): self.inbox = Queue() print 'Creating %r ...' % self gevent.Greenlet.__init__(self) def process(self, args): raise NotImplementedError() def receive(self, message): arguments = message['args'] path = message['path'] if len(path) > 1: path.pop(0) next_step = path[0] else: next_step = None print 'Processing in %r, next will be %r' % (self, next_step) args = self.process(arguments) if args: manager.go_next(next_step, { 'args': args, 'path': path }) def _run(self): self.running = True print 'Waiting in %r' % self while self.running: message = self.inbox.get() self.receive(message) |
Jak widać aktor będzie dziedziczył pośrednio po klasie Greenlet – dzięki czemu będzie mógł zostać dodany do pętli zdarzeń. Jego główna funkcja która odpali się od razu po jego uruchomieniu to funkcja _run() – przeciążamy ją i w pętli czekamy na wiadomości. Jeśli wiadomość nadejdzie – przekazujemy ją do funkcji receive() która podzieli wiadomość na części, wykona funkcję process(), którą nadpiszemy dla każdego konkretnego aktora i przekażemy info o zakończeniu przetwarzana do managera pętli.
Manager to nasza klasa która będzie trzymać pieczę nad tym by komunikaty były przesyłane do właściwych aktorów:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | import random import gevent class Manager(): def __init__(self): self.dictionary = dict() def add(self, obj): class_name = obj.__class__.__name__ if self.dictionary.has_key(class_name): self.dictionary[class_name].append(obj) else: self.dictionary[class_name] = [obj] def get_next_actor(self, name): if not self.dictionary.has_key(name): print 'Actor %s not found!' % name name = 'FinishActor' return random.choice(self.dictionary[name]) def go_next(self, next_path, args): actor_instance = self.get_next_actor(next_path) print 'Putting into %r' % actor_instance actor_instance.inbox.put(args) gevent.idle() manager = Manager() |
jak widać – posiada ona słownik ze wszystkimi aktorami i po podanych argumentach wybiera kolejnego aktora, wrzucając mu do skrzynki wiadomość. Zauważ proszę że jeśli aktor nie zostanie znaleziony – przeskakujemy do aktora końcowego oraz jeśli mamy więcej niż jednego aktora o tej samej nazwie – wybrany zostanie losowy.
Jako że aktor startowy nie posiada własnej skrzynki odbiorczej, czyli w sumie to nie jest aktorem w rozumieniu tego wzorca – nie bedzie dziedziczył po klasie AbstractActor, zbudujemy dla niego jej minimalistyczną formę:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | class StartActor(gevent.Greenlet): def __init__(self): gevent.Greenlet.__init__(self) # self.prepare_input() def get_path(self): """ Define in your subclass. """ raise NotImplementedError() def prepare_input(self): """ Define in your subclass. """ raise NotImplementedError() def _run(self): self.prepare_input() |
I zaimplementujemy już naszego aktor startowego:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | class RedisActor(StartActor): CHANNEL = 'tasks' SEPARATOR = '_' PATH_SEPARATOR = '=>' def __init__(self): self.connection = redis_connector.StrictRedis(host='localhost', port=6379, db=0) self.subscriber = self.connection.pubsub(ignore_subscribe_messages=True) self.path = '' StartActor.__init__(self) def get_path(self): return self.path.split(self.PATH_SEPARATOR) def handle(self, msg): arguments, path = msg.split(self.SEPARATOR) arguments = self.connection.get(arguments) path = self.connection.get(path) if not path or not arguments: return self.path = path parsed_path = self.get_path() im = Image.open(arguments) manager.go_next(parsed_path[0], {'path': parsed_path, 'args':im}) def prepare_input(self): self.subscriber.subscribe(self.CHANNEL) for msg in self.subscriber.listen(): self.handle(msg['data']) |
Jak widać, jako interfejsu wejściowego użyłem bazy noSql’owej – Redis. Nasłuchuje ona na kanał „tasks” gdzie dostaje stringa w formacie xxxx_yyyy gdzie xxxx to klucz pod który znajduje się ścieżka do pliku który ma być przetworzony a pod kluczem yyyy jest ścieżka przetwarzania w formacie AAA=>BBB. Całość tego dzieje się w funkcji
1 2 3 4 5 6 7 8 9 10 11 | def handle(self, msg): arguments, path = msg.split(self.SEPARATOR) arguments = self.connection.get(arguments) path = self.connection.get(path) if not path or not arguments: return self.path = path parsed_path = self.get_path() im = Image.open(arguments) manager.go_next(parsed_path[0], {'path': parsed_path, 'args':im}) |
zauważ proszę że do kolejnego aktora nie jest wysyłany obiekt z ścieżką pliku a obiekt z już uchwytem do otwartego pliku. Wyżej pokazany manager bierze tablice przetwarzania w formacie [’AAA’, 'BBB’] dla argumentu 'AAA=>BBB’, wycina element z góry stosu, przekazuje pozostałą część do wyciętego Aktora, ten przetwarza i cykl się powtarza.
Stworzyłem sobie dwóch aktorów przetwarzajacych:
1 2 3 4 5 6 7 | from ..abstract_actor import AbstractActor from PIL import ImageFilter class Blur(AbstractActor): def process(self, img): blurred = img.filter(ImageFilter.BLUR) return blurred |
1 2 3 4 5 6 7 | from ..abstract_actor import AbstractActor from PIL import ImageFilter class Detail(AbstractActor): def process(self, img): detailed = img.filter(ImageFilter.DETAIL) return detailed |
odpowiednio – efekt rozmycia i wyostrzenia.
Na koniec jeszcze aktor kończący:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from ..abstract_actor import AbstractActor import string, random class FinishActor(AbstractActor): def random_generator(self, size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for x in range(size)) def process(self, img): print 'Saving in format %r .... ' % 'jpg' filename = '/home/mmazurek/' + self.random_generator() + '.jpg' img.save(filename, img.format) print 'Saved in %r' % filename |
I zepnijmy wszystko do kupy :)
Plik main.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import gevent from gevent.monkey import patch_all from actors.abstract_actor import AbstractActor from actors.finish_actors import * from actors.initial_actors import * from actors.processing_actors import * ABSTRACT_ACTORS = ['StartActor'] def inheritors(klass): subclasses = [] work = klass while work: parent = work.pop() for child in parent.__subclasses__(): if child not in subclasses: if child.__name__ not in ABSTRACT_ACTORS: subclasses.append(child) work.append(child) return subclasses def setup(): all_actors = inheritors([AbstractActor, StartActor]) all_actors_instances = [actor() for actor in all_actors] to_join = [] for actor in all_actors_instances: actor.start() manager.add(actor) to_join.append(actor) print 'Created %r' % to_join gevent.joinall(to_join) patch_all() setup() |
Funkcja inheritors znajdzie nam wszystkie klasy dziedziczące po „AbstractActor” i „StartActor”. Taki prosty autodiscovery. Dla każdej znalezionej klasy tworzymy jej instancję, każdą startujemy (dodajemy do pętli zdarzeń), rejestrujemy do managera tak żeby była przez niego widoczna i odpalamy „joinall” – czekając na wszystkie. Zobaczmy jak to działa.
Obrazek wejściowy:
i efekt dla „Blur=>Detail”:
Logi tak wyglądają:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | Creating <FinishActor at 0x7ff071118190> ... Creating <Blur at 0x7ff071118230> ... Creating <Detail at 0x7ff0711182d0> ... Created [<RedisActor at 0x7ff0724dc690>, <FinishActor at 0x7ff071118190>, <Blur at 0x7ff071118230>, <Detail at 0x7 ff0711182d0>] Putting into <Blur at 0x7ff071118230> Waiting in <FinishActor at 0x7ff071118190> Waiting in <Blur at 0x7ff071118230> Processing in <Blur at 0x7ff071118230>, next will be 'Detail' Putting into <Detail at 0x7ff0711182d0> Waiting in <Detail at 0x7ff0711182d0> Processing in <Detail at 0x7ff0711182d0>, next will be None Actor None not found! Putting into <FinishActor at 0x7ff071118190> Processing in <FinishActor at 0x7ff071118190>, next will be None Saving in format 'jpg' .... Saved in '/home/mmazurek/V5K02J.jpg' |
Dla Blur=>Blur=>Blur=>Blur:
I dla np. Detail=>Blur=>Detail=>Detail=>Detail
Oczywiście w trakcie używania takiego systemu możemy dojść do wniosku że któraś z warstw (aktorów) ma większe obciążenie, w sensie przetwarzanie w niej trwa dłużej. Można wtedy dołożyć kolejną instancję już istniejącego Aktora:
1 | all_actors_instances.append(Blur()) |
w funkcji setup() przed pętlą odpalającą greenlety.
Spowoduje to lekką zmianę w logach powitalnych:
1 2 3 4 5 6 7 8 9 | Creating <FinishActor at 0x7f17aee32d70> ... Creating <Blur at 0x7f17aee32cd0> ... Creating <Detail at 0x7f17aee32eb0> ... Creating <Blur at 0x7f17aee32e10> ... Created [<RedisActor at 0x7f17aee32c30>, <FinishActor at 0x7f17aee32d70>, <Blur at 0x7f17aee32cd0>, <Detail at 0x7f17aee32eb0>, <Blur at 0x7f17aee32e10>] Waiting in <FinishActor at 0x7f17aee32d70> Waiting in <Blur at 0x7f17aee32cd0> Waiting in <Detail at 0x7f17aee32eb0> Waiting in <Blur at 0x7f17aee32e10> |
Widać po prostu że dodatkowy greenlet został uruchomiony. Rozróżnić można je po adresie pamięci. Spróbujmy odpalić system dla danych Blur=>Blur=>Blur:
1 2 3 4 5 6 7 8 9 10 11 | Putting into <Blur at 0x7f17aee32cd0> Processing in <Blur at 0x7f17aee32cd0>, next will be 'Blur' Putting into <Blur at 0x7f17aee32e10> Processing in <Blur at 0x7f17aee32e10>, next will be 'Blur' Putting into <Blur at 0x7f17aee32e10> Processing in <Blur at 0x7f17aee32e10>, next will be None Actor None not found! Putting into <FinishActor at 0x7f17aee32d70> Processing in <FinishActor at 0x7f17aee32d70>, next will be None Saving in format 'jpg' .... Saved in '/home/mmazurek/V14113.jpg' |
I można zauważyć że raz obrazek przetwarza greenlet o adresie z końcówką cd0 a raz e10. Oczywiście nie jest to jakiś bardzo dobry sposób na wybór docelowego Aktora – można by pokusić się tutaj o np. jakiś loadballancing albo chociaż rotację liniową.
Warto dostrzec też że zbudowaliśmy architekturę potokową. Tzn podzieliliśmy jakiś proces na etapy i poszczególne etapy, mimo iż są zależne od siebie, to mogą przetwarzać inne zadania, tzn że jeśli zadanie #1 dojdzie do 3 etapu przetwarzania to drugi etap może już działać na zadaniu #2 a pierwszy na #3.
Oczywiście ten system jest pokazowy – nie do zastosowań produkcyjnych, brakuje mu np obsługi wyjątków, brak podnoszenia greenletów np. bo błędzie czy jakiejkolwiek odporności na awarie – w tym momencie wyjątek w którymkolwiek etapie blokuje pętle zdarzeń.
Może warto odpalić wiele pętli zdarzeń, każda na osobnym procesie?
Kombinować zawsze warto :)
Oj daaawnoo mnie tu nie było. Ale wakacje to był czas dużej liczby intensywnych wyjazdów i tak naprawdę, dopiero jakoś… Read More
Cześć! Zapraszam na krótkie podsumowanie kwietnia. Wyjazd do Niemiec A dokładniej pod granicę z Francją. Chrześnica miała pierwszą komunię. Po… Read More
Ostatnio tygodnie były tak bardzo wypełnione, że nie udało mi się napisać nawet krótkiego podsumowanie. Więc dziś zbiorczo podsumuję luty… Read More
Zapraszam na krótkie podsumowanie miesiąca. Książki W styczniu przeczytałem "Homo Deus: Historia jutra". Książka łudząco podoba do wcześniejszej książki tego… Read More
Cześć! Zapraszam na podsumowanie roku 2023. Książki Zacznijmy od książek. W tym roku cel 35 książek nie został osiągnięty. Niemniej… Read More
Zapraszam na krótkie podsumowanie miesiąca. Książki W grudniu skończyłem czytać Mein Kampf. Nudna książka. Ciekawsze fragmenty można by było streścić… Read More