Model aktorów – implementacja z użyciem biblioteki gevent

Cześć,
zgodnie z poprzednim wpisem – staram się pisać częściej, a skoro karp już zjedzony i popity kompotem z suszu (obu nie lubię ;) ) to można by coś napisać. Od jakiegoś czasu przymierzam się do napisania o gevencie – tj. implementacji pętli zdarzeń dla Pythona. I ten czas – nastał właśnie teraz.

Zacznijmy więc może od tego czym jest pętla zdarzeń. A może nie, zacznijmy od tego jak możemy wykorzystać wątki do zapewnienia przetwarzania wielowątkowego.
Działa więc to tak że jak przychodzi jakiś request to cała jego obsługa jest robiona w nowym wątku (lub w wątku leżącym w puli wątków) – każdy wiec taki request jest całkowicie niezależny od innego – brzmi to całkiem nieźle. Aczkolwiek może też powodować problemy – ponieważ dla każdego połączenia tworzony jest nowy wątek, co angażuje zasoby systemowe w jego stworzenie jak i w „administrację” nim – przełączanie itp. Co więcej, jeśli wątek natrafi na jakieś żądanie I/O tj. robienie selecta do bazy czy czytanie z pliku – wątek ten będzie leżał nieużywany – co powoduje marnowanie zasobów. Przy wątkach dochodzi jeszcze problem ich synchronizacji – tj takiego manewrowania nimi żeby nie zaszkodzić sobie nawzajem przy dostępnie do części wspólnych a więc – tworzenie sekcji krytycznych czy odpowiednie poziomy transakcji na bazie danych.

Jako alternatywę mamy właśnie pętle zdarzeń, której architektura wygląda tak:

wchodząc w głębie tego schematu – pętla zdarzeń pracuje na pojedynczym wątku, przez co w jednym czasie może robić tylko jedną rzecz. Żongluje ona zadaniami do wykonania, biorąc pod uwagę priorytety zadań. Jeśli zadanie zostanie odpalone przez pętle zdarzeń, ale zacznie np. czytać z bazy danych (mieć operację I/O) to takie zadanie przestanie blokować pętle i pętla zacznie wykonywać inne zadanie z kolejki – a zadanie z operacją I/O wróci do kolejki gdy ta operacja się zakończy. A więc jak widać, tutaj jednostką wykonawczą nie jest wątek. Wszystko dzieje się w jednym wątku, przez co nie ma problemów z synchronizacją, nie nakładamy na system operacyjny ciężaru obsługi kolejnych wątków a wszystko jest nadal „współbieżne”.

Modeli użycia pętli zdarzeń jest kilka – my zajmiemy się biblioteką gevent i modelem aktorów. Gevent w najprostszym użyciu wygląda tak:

1
2
3
4
5
6
7
>>> import gevent
>>> from gevent import socket
>>> urls = ['www.google.com', 'www.example.com', 'www.python.org']
>>> jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
>>> gevent.joinall(jobs, timeout=2)
>>> [job.value for job in jobs]
['74.125.79.106', '208.77.188.166', '82.94.164.162']

Odpalamy tu tyle greenletów (tak nazywa się jednostka wykonawcza w gevencie – tłumaczy się to jako „lekkie wątki”) ile jest elementów tablicy urls – i odpalamy wszystkie, czekając na wszystkie, max 2 sekundy. Każdy nasz greenlet odpala funkcję gethostbyname dla zadanego adresu url.

My się pobawimy troszkę inaczej, tak jak pisałem wcześniej – zaimplementujemy wzorzec aktorów.

Wzorzec ten tworzy abstrakcję – Aktora. Jest to jednostka wykonawcza której stan jest modyfikowany tylko i wyłączenie przez nią samą. Komunikuje się innymi aktorami poprzez wiadomości – a wiec aktor może odbierać wiadomości i wysyłać je. No i rzecz jasna – coś z nimi robić. Skrzynka odbiorcza aktora jest kolejką – a więc jeden aktor może w jednym momencie przetwarzać jedną wiadomość – reszta jest kolejkowana.

Zbudujemy system przetwarzania obrazów oparty o model aktorów. Będziemy mieli dwóch specjalnych aktorów, tj. aktora który stanowi wejście do systemu i taki który stanowi wyjście z systemu. Pomiędzy nimi będzie n aktorów którzy będą dokładać swoje cegiełki do przetwarzanego obrazu. Więc pomysł działania jest prosty:

Aktor Startowy (odbiera jaki plik przetworzyć i jakie efekty na niego nałożyć)
.. przetwarzanie
.. przetwarzanie
Aktor końcowy (zapisuje obraz z nałożonymi wcześniej efektami do pliku).

Zacznijmy więc od napisania klasy po której będą dziedziczyli aktorzy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import gevent
from gevent.queue import Queue
from gevent.local import local
from exceptions import NotImplementedError
from manager.manager import manager

class AbstractActor(gevent.Greenlet):

  def __init__(self):
      self.inbox = Queue()
      print 'Creating %r ...' % self
      gevent.Greenlet.__init__(self)

  def process(self, args):
      raise NotImplementedError()

  def receive(self, message):  
      arguments = message['args']
      path = message['path']
      if len(path) > 1:
          path.pop(0)
          next_step = path[0]
      else:
          next_step = None

      print 'Processing in %r, next will be %r' % (self, next_step)
      args = self.process(arguments)
      if args:
          manager.go_next(next_step, {
              'args': args,
              'path': path
          })

  def _run(self):
      self.running = True
      print 'Waiting in %r' % self
      while self.running:
          message = self.inbox.get()
          self.receive(message)

Jak widać aktor będzie dziedziczył pośrednio po klasie Greenlet – dzięki czemu będzie mógł zostać dodany do pętli zdarzeń. Jego główna funkcja która odpali się od razu po jego uruchomieniu to funkcja _run() – przeciążamy ją i w pętli czekamy na wiadomości. Jeśli wiadomość nadejdzie – przekazujemy ją do funkcji receive() która podzieli wiadomość na części, wykona funkcję process(), którą nadpiszemy dla każdego konkretnego aktora i przekażemy info o zakończeniu przetwarzana do managera pętli.

Manager to nasza klasa która będzie trzymać pieczę nad tym by komunikaty były przesyłane do właściwych aktorów:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import random
import gevent

class Manager():

  def __init__(self):
      self.dictionary = dict()

  def add(self, obj):
      class_name = obj.__class__.__name__
      if self.dictionary.has_key(class_name):
          self.dictionary[class_name].append(obj)
      else:
          self.dictionary[class_name] = [obj]

  def get_next_actor(self, name):
      if not self.dictionary.has_key(name):
          print 'Actor %s not found!' % name
          name = 'FinishActor'
      return random.choice(self.dictionary[name])

  def go_next(self, next_path, args):
      actor_instance = self.get_next_actor(next_path)
      print 'Putting into  %r' % actor_instance
      actor_instance.inbox.put(args)
      gevent.idle()

manager = Manager()

jak widać – posiada ona słownik ze wszystkimi aktorami i po podanych argumentach wybiera kolejnego aktora, wrzucając mu do skrzynki wiadomość. Zauważ proszę że jeśli aktor nie zostanie znaleziony – przeskakujemy do aktora końcowego oraz jeśli mamy więcej niż jednego aktora o tej samej nazwie – wybrany zostanie losowy.

Jako że aktor startowy nie posiada własnej skrzynki odbiorczej, czyli w sumie to nie jest aktorem w rozumieniu tego wzorca – nie bedzie dziedziczył po klasie AbstractActor, zbudujemy dla niego jej minimalistyczną formę:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class StartActor(gevent.Greenlet):

  def __init__(self):
      gevent.Greenlet.__init__(self)
      # self.prepare_input()

  def get_path(self):
      """
      Define in your subclass.
      """

      raise NotImplementedError()

  def prepare_input(self):
      """
      Define in your subclass.
      """

      raise NotImplementedError()

  def _run(self):
      self.prepare_input()

I zaimplementujemy już naszego aktor startowego:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class RedisActor(StartActor):

  CHANNEL = 'tasks'
  SEPARATOR = '_'
  PATH_SEPARATOR = '=>'
  def __init__(self):
      self.connection = redis_connector.StrictRedis(host='localhost', port=6379, db=0)
      self.subscriber = self.connection.pubsub(ignore_subscribe_messages=True)
      self.path = ''
      StartActor.__init__(self)

  def get_path(self):
      return self.path.split(self.PATH_SEPARATOR)

  def handle(self, msg):
      arguments, path = msg.split(self.SEPARATOR)
      arguments = self.connection.get(arguments)
      path = self.connection.get(path)
      if not path or not arguments:
          return
      self.path = path
      parsed_path = self.get_path()
      im = Image.open(arguments)

      manager.go_next(parsed_path[0], {'path': parsed_path, 'args':im})

  def prepare_input(self):
      self.subscriber.subscribe(self.CHANNEL)
      for msg in self.subscriber.listen():
          self.handle(msg['data'])

Jak widać, jako interfejsu wejściowego użyłem bazy noSql’owej – Redis. Nasłuchuje ona na kanał „tasks” gdzie dostaje stringa w formacie xxxx_yyyy gdzie xxxx to klucz pod który znajduje się ścieżka do pliku który ma być przetworzony a pod kluczem yyyy jest ścieżka przetwarzania w formacie AAA=>BBB. Całość tego dzieje się w funkcji

1
2
3
4
5
6
7
8
9
10
11
  def handle(self, msg):
      arguments, path = msg.split(self.SEPARATOR)
      arguments = self.connection.get(arguments)
      path = self.connection.get(path)
      if not path or not arguments:
          return
      self.path = path
      parsed_path = self.get_path()
      im = Image.open(arguments)

      manager.go_next(parsed_path[0], {'path': parsed_path, 'args':im})

zauważ proszę że do kolejnego aktora nie jest wysyłany obiekt z ścieżką pliku a obiekt z już uchwytem do otwartego pliku. Wyżej pokazany manager bierze tablice przetwarzania w formacie [’AAA’, 'BBB’] dla argumentu 'AAA=>BBB’, wycina element z góry stosu, przekazuje pozostałą część do wyciętego Aktora, ten przetwarza i cykl się powtarza.

Stworzyłem sobie dwóch aktorów przetwarzajacych:

1
2
3
4
5
6
7
from ..abstract_actor import AbstractActor
from PIL import ImageFilter

class Blur(AbstractActor):
  def process(self, img):
      blurred = img.filter(ImageFilter.BLUR)
      return blurred
1
2
3
4
5
6
7
from ..abstract_actor import AbstractActor
from PIL import ImageFilter

class Detail(AbstractActor):
  def process(self, img):
      detailed = img.filter(ImageFilter.DETAIL)
      return detailed

odpowiednio – efekt rozmycia i wyostrzenia.

Na koniec jeszcze aktor kończący:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from ..abstract_actor import AbstractActor
import string, random


class FinishActor(AbstractActor):

  def random_generator(self, size=6, chars=string.ascii_uppercase + string.digits):
      return ''.join(random.choice(chars) for x in range(size))

  def process(self, img):
      print 'Saving in format %r .... ' % 'jpg'
      filename = '/home/mmazurek/' + self.random_generator() + '.jpg'
      img.save(filename, img.format)
      print 'Saved in %r' % filename

I zepnijmy wszystko do kupy :)

Plik main.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import gevent
from gevent.monkey import patch_all
from actors.abstract_actor import AbstractActor
from actors.finish_actors import *
from actors.initial_actors import *
from actors.processing_actors import *
ABSTRACT_ACTORS = ['StartActor']

def inheritors(klass):
  subclasses = []
  work = klass
  while work:
      parent = work.pop()
      for child in parent.__subclasses__():
          if child not in subclasses:
              if child.__name__ not in ABSTRACT_ACTORS:
                  subclasses.append(child)
              work.append(child)
  return subclasses

def setup():
  all_actors = inheritors([AbstractActor, StartActor])
  all_actors_instances = [actor() for actor in all_actors]
  to_join = []
  for actor in all_actors_instances:
      actor.start()
      manager.add(actor)
      to_join.append(actor)

  print 'Created %r' % to_join

  gevent.joinall(to_join)


patch_all()
setup()

Funkcja inheritors znajdzie nam wszystkie klasy dziedziczące po „AbstractActor” i „StartActor”. Taki prosty autodiscovery. Dla każdej znalezionej klasy tworzymy jej instancję, każdą startujemy (dodajemy do pętli zdarzeń), rejestrujemy do managera tak żeby była przez niego widoczna i odpalamy „joinall” – czekając na wszystkie. Zobaczmy jak to działa.

Obrazek wejściowy:

i efekt dla „Blur=>Detail”:

Logi tak wyglądają:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Creating <FinishActor at 0x7ff071118190> ...
Creating <Blur at 0x7ff071118230> ...
Creating <Detail at 0x7ff0711182d0> ...
Created [<RedisActor at 0x7ff0724dc690>, <FinishActor at 0x7ff071118190>, <Blur at 0x7ff071118230>, <Detail at 0x7
ff0711182d0>]
Putting into  <Blur at 0x7ff071118230>
Waiting in <FinishActor at 0x7ff071118190>
Waiting in <Blur at 0x7ff071118230>
Processing in <Blur at 0x7ff071118230>, next will be 'Detail'
Putting into  <Detail at 0x7ff0711182d0>
Waiting in <Detail at 0x7ff0711182d0>
Processing in <Detail at 0x7ff0711182d0>, next will be None
Actor None not found!
Putting into  <FinishActor at 0x7ff071118190>
Processing in <FinishActor at 0x7ff071118190>, next will be None
Saving in format 'jpg' ....
Saved in '/home/mmazurek/V5K02J.jpg'

Dla Blur=>Blur=>Blur=>Blur:

I dla np. Detail=>Blur=>Detail=>Detail=>Detail

Oczywiście w trakcie używania takiego systemu możemy dojść do wniosku że któraś z warstw (aktorów) ma większe obciążenie, w sensie przetwarzanie w niej trwa dłużej. Można wtedy dołożyć kolejną instancję już istniejącego Aktora:

1
all_actors_instances.append(Blur())

w funkcji setup() przed pętlą odpalającą greenlety.

Spowoduje to lekką zmianę w logach powitalnych:

1
2
3
4
5
6
7
8
9
Creating <FinishActor at 0x7f17aee32d70> ...
Creating <Blur at 0x7f17aee32cd0> ...
Creating <Detail at 0x7f17aee32eb0> ...
Creating <Blur at 0x7f17aee32e10> ...
Created [<RedisActor at 0x7f17aee32c30>, <FinishActor at 0x7f17aee32d70>, <Blur at 0x7f17aee32cd0>, <Detail at 0x7f17aee32eb0>, <Blur at 0x7f17aee32e10>]
Waiting in <FinishActor at 0x7f17aee32d70>
Waiting in <Blur at 0x7f17aee32cd0>
Waiting in <Detail at 0x7f17aee32eb0>
Waiting in <Blur at 0x7f17aee32e10>

Widać po prostu że dodatkowy greenlet został uruchomiony. Rozróżnić można je po adresie pamięci. Spróbujmy odpalić system dla danych Blur=>Blur=>Blur:

1
2
3
4
5
6
7
8
9
10
11
Putting into <Blur at 0x7f17aee32cd0>
Processing in <Blur at 0x7f17aee32cd0>, next will be 'Blur'
Putting into <Blur at 0x7f17aee32e10>
Processing in <Blur at 0x7f17aee32e10>, next will be 'Blur'
Putting into <Blur at 0x7f17aee32e10>
Processing in <Blur at 0x7f17aee32e10>, next will be None
Actor None not found!
Putting into <FinishActor at 0x7f17aee32d70>
Processing in <FinishActor at 0x7f17aee32d70>, next will be None
Saving in format 'jpg' ....
Saved in '/home/mmazurek/V14113.jpg'

I można zauważyć że raz obrazek przetwarza greenlet o adresie z końcówką cd0 a raz e10. Oczywiście nie jest to jakiś bardzo dobry sposób na wybór docelowego Aktora – można by pokusić się tutaj o np. jakiś loadballancing albo chociaż rotację liniową.

Warto dostrzec też że zbudowaliśmy architekturę potokową. Tzn podzieliliśmy jakiś proces na etapy i poszczególne etapy, mimo iż są zależne od siebie, to mogą przetwarzać inne zadania, tzn że jeśli zadanie #1 dojdzie do 3 etapu przetwarzania to drugi etap może już działać na zadaniu #2 a pierwszy na #3.

Oczywiście ten system jest pokazowy – nie do zastosowań produkcyjnych, brakuje mu np obsługi wyjątków, brak podnoszenia greenletów np. bo błędzie czy jakiejkolwiek odporności na awarie – w tym momencie wyjątek w którymkolwiek etapie blokuje pętle zdarzeń.

Może warto odpalić wiele pętli zdarzeń, każda na osobnym procesie?

Kombinować zawsze warto :)

Dzięki za wizytę,
Mateusz Mazurek
Mateusz M.

Ostatnie wpisy

Podsumowanie: luty i marzec 2024

Ostatnio tygodnie były tak bardzo wypełnione, że nie udało mi się napisać nawet krótkiego podsumowanie. Więc dziś zbiorczo podsumuję luty… Read More

2 tygodnie ago

Podsumowanie: styczeń 2024

Zapraszam na krótkie podsumowanie miesiąca. Książki W styczniu przeczytałem "Homo Deus: Historia jutra". Książka łudząco podoba do wcześniejszej książki tego… Read More

3 miesiące ago

Podsumowanie roku 2023

Cześć! Zapraszam na podsumowanie roku 2023. Książki Zacznijmy od książek. W tym roku cel 35 książek nie został osiągnięty. Niemniej… Read More

3 miesiące ago

Podsumowanie: grudzień 2023

Zapraszam na krótkie podsumowanie miesiąca. Książki W grudniu skończyłem czytać Mein Kampf. Nudna książka. Ciekawsze fragmenty można by było streścić… Read More

4 miesiące ago

Praca zdalna – co z nią dalej?

Cześć, ostatnio w Internecie pojawiło się dużo artykułów, które nie były przychylne pracy zdalnej. Z drugiej strony większość komentarzy pod… Read More

4 miesiące ago

Podsumowanie: listopad 2023

Zapraszam na krótkie podsumowanie miesiąca. Książki W listopadzie dokończyłem cykl "Z mgły zrodzony" Sandersona. Tylko "Stop prawa" mi nie do… Read More

5 miesięcy ago