Model aktorów – implementacja z użyciem biblioteki gevent
Cześć,
zgodnie z poprzednim wpisem – staram się pisać częściej, a skoro karp już zjedzony i popity kompotem z suszu (obu nie lubię ;) ) to można by coś napisać. Od jakiegoś czasu przymierzam się do napisania o gevencie – tj. implementacji pętli zdarzeń dla Pythona. I ten czas – nastał właśnie teraz.
Zacznijmy więc może od tego czym jest pętla zdarzeń. A może nie, zacznijmy od tego jak możemy wykorzystać wątki do zapewnienia przetwarzania wielowątkowego.
Działa więc to tak że jak przychodzi jakiś request to cała jego obsługa jest robiona w nowym wątku (lub w wątku leżącym w puli wątków) – każdy wiec taki request jest całkowicie niezależny od innego – brzmi to całkiem nieźle. Aczkolwiek może też powodować problemy – ponieważ dla każdego połączenia tworzony jest nowy wątek, co angażuje zasoby systemowe w jego stworzenie jak i w „administrację” nim – przełączanie itp. Co więcej, jeśli wątek natrafi na jakieś żądanie I/O tj. robienie selecta do bazy czy czytanie z pliku – wątek ten będzie leżał nieużywany – co powoduje marnowanie zasobów. Przy wątkach dochodzi jeszcze problem ich synchronizacji – tj takiego manewrowania nimi żeby nie zaszkodzić sobie nawzajem przy dostępnie do części wspólnych a więc – tworzenie sekcji krytycznych czy odpowiednie poziomy transakcji na bazie danych.
Jako alternatywę mamy właśnie pętle zdarzeń, której architektura wygląda tak:
wchodząc w głębie tego schematu – pętla zdarzeń pracuje na pojedynczym wątku, przez co w jednym czasie może robić tylko jedną rzecz. Żongluje ona zadaniami do wykonania, biorąc pod uwagę priorytety zadań. Jeśli zadanie zostanie odpalone przez pętle zdarzeń, ale zacznie np. czytać z bazy danych (mieć operację I/O) to takie zadanie przestanie blokować pętle i pętla zacznie wykonywać inne zadanie z kolejki – a zadanie z operacją I/O wróci do kolejki gdy ta operacja się zakończy. A więc jak widać, tutaj jednostką wykonawczą nie jest wątek. Wszystko dzieje się w jednym wątku, przez co nie ma problemów z synchronizacją, nie nakładamy na system operacyjny ciężaru obsługi kolejnych wątków a wszystko jest nadal „współbieżne”.
Modeli użycia pętli zdarzeń jest kilka – my zajmiemy się biblioteką gevent i modelem aktorów. Gevent w najprostszym użyciu wygląda tak:
1 2 3 4 5 6 7 | >>> import gevent >>> from gevent import socket >>> urls = ['www.google.com', 'www.example.com', 'www.python.org'] >>> jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] >>> gevent.joinall(jobs, timeout=2) >>> [job.value for job in jobs] ['74.125.79.106', '208.77.188.166', '82.94.164.162'] |
Odpalamy tu tyle greenletów (tak nazywa się jednostka wykonawcza w gevencie – tłumaczy się to jako „lekkie wątki”) ile jest elementów tablicy urls – i odpalamy wszystkie, czekając na wszystkie, max 2 sekundy. Każdy nasz greenlet odpala funkcję gethostbyname dla zadanego adresu url.
My się pobawimy troszkę inaczej, tak jak pisałem wcześniej – zaimplementujemy wzorzec aktorów.
Wzorzec ten tworzy abstrakcję – Aktora. Jest to jednostka wykonawcza której stan jest modyfikowany tylko i wyłączenie przez nią samą. Komunikuje się innymi aktorami poprzez wiadomości – a wiec aktor może odbierać wiadomości i wysyłać je. No i rzecz jasna – coś z nimi robić. Skrzynka odbiorcza aktora jest kolejką – a więc jeden aktor może w jednym momencie przetwarzać jedną wiadomość – reszta jest kolejkowana.
Zbudujemy system przetwarzania obrazów oparty o model aktorów. Będziemy mieli dwóch specjalnych aktorów, tj. aktora który stanowi wejście do systemu i taki który stanowi wyjście z systemu. Pomiędzy nimi będzie n aktorów którzy będą dokładać swoje cegiełki do przetwarzanego obrazu. Więc pomysł działania jest prosty:
Aktor Startowy (odbiera jaki plik przetworzyć i jakie efekty na niego nałożyć)
.. przetwarzanie
.. przetwarzanie
Aktor końcowy (zapisuje obraz z nałożonymi wcześniej efektami do pliku).
Zacznijmy więc od napisania klasy po której będą dziedziczyli aktorzy:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | import gevent from gevent.queue import Queue from gevent.local import local from exceptions import NotImplementedError from manager.manager import manager class AbstractActor(gevent.Greenlet): def __init__(self): self.inbox = Queue() print 'Creating %r ...' % self gevent.Greenlet.__init__(self) def process(self, args): raise NotImplementedError() def receive(self, message): arguments = message['args'] path = message['path'] if len(path) > 1: path.pop(0) next_step = path[0] else: next_step = None print 'Processing in %r, next will be %r' % (self, next_step) args = self.process(arguments) if args: manager.go_next(next_step, { 'args': args, 'path': path }) def _run(self): self.running = True print 'Waiting in %r' % self while self.running: message = self.inbox.get() self.receive(message) |
Jak widać aktor będzie dziedziczył pośrednio po klasie Greenlet – dzięki czemu będzie mógł zostać dodany do pętli zdarzeń. Jego główna funkcja która odpali się od razu po jego uruchomieniu to funkcja _run() – przeciążamy ją i w pętli czekamy na wiadomości. Jeśli wiadomość nadejdzie – przekazujemy ją do funkcji receive() która podzieli wiadomość na części, wykona funkcję process(), którą nadpiszemy dla każdego konkretnego aktora i przekażemy info o zakończeniu przetwarzana do managera pętli.
Manager to nasza klasa która będzie trzymać pieczę nad tym by komunikaty były przesyłane do właściwych aktorów:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | import random import gevent class Manager(): def __init__(self): self.dictionary = dict() def add(self, obj): class_name = obj.__class__.__name__ if self.dictionary.has_key(class_name): self.dictionary[class_name].append(obj) else: self.dictionary[class_name] = [obj] def get_next_actor(self, name): if not self.dictionary.has_key(name): print 'Actor %s not found!' % name name = 'FinishActor' return random.choice(self.dictionary[name]) def go_next(self, next_path, args): actor_instance = self.get_next_actor(next_path) print 'Putting into %r' % actor_instance actor_instance.inbox.put(args) gevent.idle() manager = Manager() |
jak widać – posiada ona słownik ze wszystkimi aktorami i po podanych argumentach wybiera kolejnego aktora, wrzucając mu do skrzynki wiadomość. Zauważ proszę że jeśli aktor nie zostanie znaleziony – przeskakujemy do aktora końcowego oraz jeśli mamy więcej niż jednego aktora o tej samej nazwie – wybrany zostanie losowy.
Jako że aktor startowy nie posiada własnej skrzynki odbiorczej, czyli w sumie to nie jest aktorem w rozumieniu tego wzorca – nie bedzie dziedziczył po klasie AbstractActor, zbudujemy dla niego jej minimalistyczną formę:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | class StartActor(gevent.Greenlet): def __init__(self): gevent.Greenlet.__init__(self) # self.prepare_input() def get_path(self): """ Define in your subclass. """ raise NotImplementedError() def prepare_input(self): """ Define in your subclass. """ raise NotImplementedError() def _run(self): self.prepare_input() |
I zaimplementujemy już naszego aktor startowego:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | class RedisActor(StartActor): CHANNEL = 'tasks' SEPARATOR = '_' PATH_SEPARATOR = '=>' def __init__(self): self.connection = redis_connector.StrictRedis(host='localhost', port=6379, db=0) self.subscriber = self.connection.pubsub(ignore_subscribe_messages=True) self.path = '' StartActor.__init__(self) def get_path(self): return self.path.split(self.PATH_SEPARATOR) def handle(self, msg): arguments, path = msg.split(self.SEPARATOR) arguments = self.connection.get(arguments) path = self.connection.get(path) if not path or not arguments: return self.path = path parsed_path = self.get_path() im = Image.open(arguments) manager.go_next(parsed_path[0], {'path': parsed_path, 'args':im}) def prepare_input(self): self.subscriber.subscribe(self.CHANNEL) for msg in self.subscriber.listen(): self.handle(msg['data']) |
Jak widać, jako interfejsu wejściowego użyłem bazy noSql’owej – Redis. Nasłuchuje ona na kanał „tasks” gdzie dostaje stringa w formacie xxxx_yyyy gdzie xxxx to klucz pod który znajduje się ścieżka do pliku który ma być przetworzony a pod kluczem yyyy jest ścieżka przetwarzania w formacie AAA=>BBB. Całość tego dzieje się w funkcji
1 2 3 4 5 6 7 8 9 10 11 | def handle(self, msg): arguments, path = msg.split(self.SEPARATOR) arguments = self.connection.get(arguments) path = self.connection.get(path) if not path or not arguments: return self.path = path parsed_path = self.get_path() im = Image.open(arguments) manager.go_next(parsed_path[0], {'path': parsed_path, 'args':im}) |
zauważ proszę że do kolejnego aktora nie jest wysyłany obiekt z ścieżką pliku a obiekt z już uchwytem do otwartego pliku. Wyżej pokazany manager bierze tablice przetwarzania w formacie [’AAA’, 'BBB’] dla argumentu 'AAA=>BBB’, wycina element z góry stosu, przekazuje pozostałą część do wyciętego Aktora, ten przetwarza i cykl się powtarza.
Stworzyłem sobie dwóch aktorów przetwarzajacych:
1 2 3 4 5 6 7 | from ..abstract_actor import AbstractActor from PIL import ImageFilter class Blur(AbstractActor): def process(self, img): blurred = img.filter(ImageFilter.BLUR) return blurred |
1 2 3 4 5 6 7 | from ..abstract_actor import AbstractActor from PIL import ImageFilter class Detail(AbstractActor): def process(self, img): detailed = img.filter(ImageFilter.DETAIL) return detailed |
odpowiednio – efekt rozmycia i wyostrzenia.
Na koniec jeszcze aktor kończący:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from ..abstract_actor import AbstractActor import string, random class FinishActor(AbstractActor): def random_generator(self, size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for x in range(size)) def process(self, img): print 'Saving in format %r .... ' % 'jpg' filename = '/home/mmazurek/' + self.random_generator() + '.jpg' img.save(filename, img.format) print 'Saved in %r' % filename |
I zepnijmy wszystko do kupy :)
Plik main.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import gevent from gevent.monkey import patch_all from actors.abstract_actor import AbstractActor from actors.finish_actors import * from actors.initial_actors import * from actors.processing_actors import * ABSTRACT_ACTORS = ['StartActor'] def inheritors(klass): subclasses = [] work = klass while work: parent = work.pop() for child in parent.__subclasses__(): if child not in subclasses: if child.__name__ not in ABSTRACT_ACTORS: subclasses.append(child) work.append(child) return subclasses def setup(): all_actors = inheritors([AbstractActor, StartActor]) all_actors_instances = [actor() for actor in all_actors] to_join = [] for actor in all_actors_instances: actor.start() manager.add(actor) to_join.append(actor) print 'Created %r' % to_join gevent.joinall(to_join) patch_all() setup() |
Funkcja inheritors znajdzie nam wszystkie klasy dziedziczące po „AbstractActor” i „StartActor”. Taki prosty autodiscovery. Dla każdej znalezionej klasy tworzymy jej instancję, każdą startujemy (dodajemy do pętli zdarzeń), rejestrujemy do managera tak żeby była przez niego widoczna i odpalamy „joinall” – czekając na wszystkie. Zobaczmy jak to działa.
Obrazek wejściowy:
i efekt dla „Blur=>Detail”:
Logi tak wyglądają:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | Creating <FinishActor at 0x7ff071118190> ... Creating <Blur at 0x7ff071118230> ... Creating <Detail at 0x7ff0711182d0> ... Created [<RedisActor at 0x7ff0724dc690>, <FinishActor at 0x7ff071118190>, <Blur at 0x7ff071118230>, <Detail at 0x7 ff0711182d0>] Putting into <Blur at 0x7ff071118230> Waiting in <FinishActor at 0x7ff071118190> Waiting in <Blur at 0x7ff071118230> Processing in <Blur at 0x7ff071118230>, next will be 'Detail' Putting into <Detail at 0x7ff0711182d0> Waiting in <Detail at 0x7ff0711182d0> Processing in <Detail at 0x7ff0711182d0>, next will be None Actor None not found! Putting into <FinishActor at 0x7ff071118190> Processing in <FinishActor at 0x7ff071118190>, next will be None Saving in format 'jpg' .... Saved in '/home/mmazurek/V5K02J.jpg' |
Dla Blur=>Blur=>Blur=>Blur:
I dla np. Detail=>Blur=>Detail=>Detail=>Detail
Oczywiście w trakcie używania takiego systemu możemy dojść do wniosku że któraś z warstw (aktorów) ma większe obciążenie, w sensie przetwarzanie w niej trwa dłużej. Można wtedy dołożyć kolejną instancję już istniejącego Aktora:
1 | all_actors_instances.append(Blur()) |
w funkcji setup() przed pętlą odpalającą greenlety.
Spowoduje to lekką zmianę w logach powitalnych:
1 2 3 4 5 6 7 8 9 | Creating <FinishActor at 0x7f17aee32d70> ... Creating <Blur at 0x7f17aee32cd0> ... Creating <Detail at 0x7f17aee32eb0> ... Creating <Blur at 0x7f17aee32e10> ... Created [<RedisActor at 0x7f17aee32c30>, <FinishActor at 0x7f17aee32d70>, <Blur at 0x7f17aee32cd0>, <Detail at 0x7f17aee32eb0>, <Blur at 0x7f17aee32e10>] Waiting in <FinishActor at 0x7f17aee32d70> Waiting in <Blur at 0x7f17aee32cd0> Waiting in <Detail at 0x7f17aee32eb0> Waiting in <Blur at 0x7f17aee32e10> |
Widać po prostu że dodatkowy greenlet został uruchomiony. Rozróżnić można je po adresie pamięci. Spróbujmy odpalić system dla danych Blur=>Blur=>Blur:
1 2 3 4 5 6 7 8 9 10 11 | Putting into <Blur at 0x7f17aee32cd0> Processing in <Blur at 0x7f17aee32cd0>, next will be 'Blur' Putting into <Blur at 0x7f17aee32e10> Processing in <Blur at 0x7f17aee32e10>, next will be 'Blur' Putting into <Blur at 0x7f17aee32e10> Processing in <Blur at 0x7f17aee32e10>, next will be None Actor None not found! Putting into <FinishActor at 0x7f17aee32d70> Processing in <FinishActor at 0x7f17aee32d70>, next will be None Saving in format 'jpg' .... Saved in '/home/mmazurek/V14113.jpg' |
I można zauważyć że raz obrazek przetwarza greenlet o adresie z końcówką cd0 a raz e10. Oczywiście nie jest to jakiś bardzo dobry sposób na wybór docelowego Aktora – można by pokusić się tutaj o np. jakiś loadballancing albo chociaż rotację liniową.
Warto dostrzec też że zbudowaliśmy architekturę potokową. Tzn podzieliliśmy jakiś proces na etapy i poszczególne etapy, mimo iż są zależne od siebie, to mogą przetwarzać inne zadania, tzn że jeśli zadanie #1 dojdzie do 3 etapu przetwarzania to drugi etap może już działać na zadaniu #2 a pierwszy na #3.
Oczywiście ten system jest pokazowy – nie do zastosowań produkcyjnych, brakuje mu np obsługi wyjątków, brak podnoszenia greenletów np. bo błędzie czy jakiejkolwiek odporności na awarie – w tym momencie wyjątek w którymkolwiek etapie blokuje pętle zdarzeń.
Może warto odpalić wiele pętli zdarzeń, każda na osobnym procesie?
Kombinować zawsze warto :)
Mateusz Mazurek