Wątki i procesy w Pythonie

Wstęp

Czymże byłoby programowanie bez współbieżności i równoległości. Watki i procesy to elementy budulcowe wspomnianego modelu a ich współistnienie i dostęp do współdzielonych danych, to potężna broń na drodze ku wydajności i efektywności kodu. Czasem niestety taki styl pisania programów potrafi przyprawić ich autorów o siwiznę albo co gorsza, sprawić, że w ogóle nie będą mieć włosów.

Podstawy

Wątek to osobna jednostka wykonawcza działająca w obrębie tego samego procesu. Oznacza to, że najprostsze programy zawsze mają proces i co najmniej jeden wątek (nazywany głównym). Nawet programy typu „hello world”.

Proces to taki worek na wątki. W jego obrębie może działać wiele wątków (ale nie mniej niż jeden), które współdzielą pamięć. Różne procesy mają swoją przestrzeń pamięci.

Różnica między zwykłym procesem a wątkiem polega na współdzieleniu przez wszystkie wątki działające w danym procesie przestrzeni adresowej oraz wszystkich innych struktur systemowych (np. listy otwartych plików itp.) – z kolei procesy posiadają niezależne zasoby.

Rodzi to poważne konsekwencje:

  • wątki są lżejsze, tzn praca z nimi nie dostarcza tak dużego narzutu jak w przypadku pracy z procesami
  • przekazywanie danych pomiędzy wątkami jest znacznie tańsze niż pomiędzy procesami
  • proces zapewnia separację pamięci, a to zapewnia bezpieczeństwo danych

Praca z wątkami

Praca z wątkami jest zazwyczaj realizowana na trzy sposoby:

  • poprzez użycie klasy Thread
  • poprzez dziedziczenie z klasy Thread
  • poprzez pulę wątków

Pierwszy sposób wypisania 20 liczb, każda w osobnym wątku, korzystając z klasy Thread:

1
2
3
4
5
6
7
8
9
10
11
12
from threading import Thread

threads = []

for i in range(20):
  threads.append(Thread(target=print, args=(i, )))

for t in threads:
  t.start()

for t in threads:
  t.join()

I ten kod oczywiście zadziała, wypisze te 20 liczb.

Drugi sposób to dziedziczenie i redefinicja run:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from threading import Thread


class MyThread(Thread):
  def __init__(self, i):
      super().__init__()
      self._i = i

  def run(self) -> None:
      print(self._i)


threads = []

for i in range(20):
  threads.append(MyThread(i=i))

for t in threads:
  t.start()

for t in threads:
  t.join()

I trzeci, chyba najpopularniejszy sposób, czyli użycie puli wątków, czyli określonej ilości wątków która będzie utrzymywana w celu wykonania wskazanych zadań:

1
2
3
4
5
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=20) as executor:
  for i in range(20):
      executor.submit(print, i)

I teraz pytanie. Czy każdy z tych kawałków kodu, zawsze wypisze liczby w odpowiedniej kolejności i bez problemów? No nie zawsze. Ale zerknijmy na bardziej obrazowy przykład.

Race condition

Tłumaczą na polski to po prostu „wyścig”. Popatrz na poniższy kod.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

variable: int = 1
FILENAME: str = 'test'


def read_from_file() -> int:
  with open(FILENAME, 'r+') as file:
      val = file.read()
      return int(val)


def write_to_file(val: int) -> None:
  with open(FILENAME, 'w') as file:
      file.write(str(val))


def increment_value_in_file():
  value = read_from_file()
  write_to_file(value + 1)


write_to_file(0)


with ThreadPoolExecutor(10) as executor:
  for _ in range(10):
      executor.submit(increment_value_in_file)

print("Final value: ", read_from_file())

To prosty skrypt który chce pracować na pliku. Uruchamia on 10 wątków, każdy wątek ma odczytać plik, pobrać jego zawartość, dodać do niej 1 i zapisać do pliku. Zawartość pliku na początku wynosi 0. Prostym jest, że oczekujemy na końcu wyniku 10.

A jaki wynik dostajemy?

Całkowicie niedeterministyczny! Za każdym uruchomieniem programu dostajemy inną wartość.

Problemem jest brak synchronizacji. Wątki przeplatają się odczytując i zapisując plik, co sprawia, że nic nie stoi na przeszkodzie by np. trzy wątki odczytały tę samą wartość z pliku, bo żaden inny wątek nie zdąży nic zapisać. To natomiast sprawi, że te trzy wątki będą próbowały zapisać… Tę samą wartość do pliku.

Bałagan, prawda?


Czekaj, stop!

Podoba Ci się to co tworzę? Jeśli tak to zapraszam Cię do zapisania się na newsletter:

Aby potwierdzić swoją subskrypcję, odbierz pocztę i kliknij w link potwierdzający:) jeśli maila nie ma to poczekaj chwile i/lub sprawdź folder spam/inne/oferty itp :)

Aby potwierdzić swoją subskrypcję, odbierz pocztę i kliknij w link potwierdzający:) jeśli maila nie ma to poczekaj chwile i/lub sprawdź folder spam/inne/oferty itp :)
a w ramach prezentu otrzymasz całkowicie za darmo, dwa dokumenty PDF „6 (nie zawsze oczywistych) błędów popełnianych podczas nauki programowania” który jest jednym z efektów ponad siedmioletniej pracy i obserwacji rozwoju niejednego programisty oraz „Wstęp do testowania w Pythonie”, będący wprowadzeniem do biblioteki PyTest.
Jeśli to Cię interesuje to zapraszam również na swoje social media.

Jak i do ewentualnego postawienia mi kawy :)

Synchronizacja dostępu do danych

W przykładzie z poprzedniego akapitu zauważyliśmy problem w dostępie do pliku. Chodzi o to, że funkcja increment_value_in_file nie jest atomowa, tzn nie traktujemy w niej sekwencji „odczytaj plik, zapisz plik” jako jedność, a to sprawia, że może się ona wykonywać równolegle w kilku wątkach.

Kategorycznie nie można tak robić.

Poprawka w kodzie jest bardzo łatwa, zajmijmy się więc najpierw jej zrozumieniem.

Aby program zaczął działać poprawnie, należy użyć blokad. A dokładniej klasy Lock z modułu threading. Instancję tej klasy możemy używać jako context managera, a więc po prostu otoczyć nią ciało funkcji increment_value_in_file. Sprawi to, że dostęp do tego kodu będzie zarezerwowany dla jednego wątku w jednym czasie. Tzn jeśli np. watek numer 3 wejdzie w kawałek kodu objęty blokadą, to żaden inny wątek nie będzie mógł zrobić tego samego, do momentu wyjścia wątku nr. 3 z tego kodu.

I czas na kod:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from threading import Thread, Lock
from concurrent.futures import ThreadPoolExecutor

variable: int = 1
FILENAME: str = 'test'

lock = Lock()


def read_from_file() -> int:
  with open(FILENAME, 'r+') as file:
      val = file.read()
      return int(val)


def write_to_file(val: int) -> None:
  with open(FILENAME, 'w') as file:
      file.write(str(val))


def increment_value_in_file():
  with lock:
      value = read_from_file()
      write_to_file(value + 1)


write_to_file(0)


with ThreadPoolExecutor(10) as executor:
  for _ in range(10):
      executor.submit(increment_value_in_file)

print("Final value: ", read_from_file())

Pytanie sprawdzające: czy kod będzie działał poprawnie, jeśli funkcję increment_value_in_file napiszemy w ten sposób:

1
2
3
4
def increment_value_in_file():
  with Lock():
      value = read_from_file()
      write_to_file(value + 1)

A jeśli nie, to dlaczego?

Problemy tego typu to zasadnicza trudność korzystania z wątków ponieważ w dużych projektach dodanie blokady nie zawsze jest tak oczywiste jak tu. A ponad to, dodanie Lock’a sprawia, że inne wątki czekają, więc zmniejszają one wydajność całego systemu.

Procesy

Tak jak pisałem wcześniej, procesy są cięższe niż wątki ale nie dzielą między sobą pamięci, a to sprawia, że problem dostępu do tego samego elementu pamięci się rozwiązuje. Nadal jednak może wystąpić wiele innych problemów, jak np. już nam znany kłopot z dostępem do pliku.

Wersja wieloprocesowa tego podejścia mogłaby wyglądać tak:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from multiprocessing import Process

variable: int = 1
FILENAME: str = 'test'


class MyProcess(Process):
  def __init__(self, func):
      super().__init__()
      self.func = func

  def run(self) -> None:
      self.func()


def read_from_file() -> int:
  with open(FILENAME, 'r+') as file:
      val = file.read()
      return int(val)


def write_to_file(val: int) -> None:
  with open(FILENAME, 'w') as file:
      file.write(str(val))


def increment_value_in_file():
  value = read_from_file()
  write_to_file(value + 1)


write_to_file(0)

processes = []

for i in range(100):
  processes.append(MyProcess(increment_value_in_file))

for p in processes:
  p.start()

for p in processes:
  p.join()

print("Final value: ", read_from_file())

I tutaj również rozwiązaniem jest Lock. Niestety nie jest to tak proste jak w przypadku wątków, bo trzeba wymyślić jak dzielić ten obiekt pomiędzy procesami. Jeśli to zepsujemy to dostaniemy błąd

RuntimeError: Lock objects should only be shared between processes through inheritance

Poprawnie możemy zrobić to tak, że po prostu przekazujemy obiekt Locka:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from multiprocessing import Lock, Process

variable: int = 1
FILENAME: str = 'test'
process_lock = Lock()


class MyProcess(Process):
  def __init__(self, func, lock):
      super().__init__()
      self.func = func
      self.lock = lock

  def run(self) -> None:
      with self.lock:
          self.func()




def read_from_file() -> int:
  with open(FILENAME, 'r+') as file:
      val = file.read()
      return int(val)


def write_to_file(val: int) -> None:
  with open(FILENAME, 'w') as file:
      file.write(str(val))


def increment_value_in_file():
  value = read_from_file()
  write_to_file(value + 1)


write_to_file(0)

processes = []

for i in range(1000):
  processes.append(MyProcess(increment_value_in_file, process_lock))

for p in processes:
  p.start()

for p in processes:
  p.join()

print("Final value: ", read_from_file())

Które podejście jest szybsze?

W tym przypadku który sobie omawiamy, odpowiedź na to pytanie już prawdopodobnie znasz. Pomierzyłem jednak czasy i oto wyniki:

For processes:

Exec time: 0.6507244110107422 [s]
Final value:  1000

For threads:

Exec time: 0.24805140495300293 [s]
Final value:  1000

Rezultat nie jest zadziwiający. Czas potrzebny na obsługę procesów jest znacznie większy niż w przypadku wątków. Czy to znaczy, że nie warto używać multiprocessingu?

Kiedy używać wielu procesów?

Przede wszystkim wtedy kiedy dzielimy kod który intensywnie wykorzystuje CPU. Popatrz na przykład:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
from threading import Thread, Lock


def long_running_cpu_task():
  for _ in range(100000000):
      pass


threads = []

for _ in range(10):
  threads.append(Thread(target=long_running_cpu_task))

start = time.time()

for t in threads:
  t.start()

for t in threads:
  t.join()

stop = time.time()
print("Exec time for threads:", stop-start, "[s]")

I oczywiście jego odpowiednik ale z użyciem procesów:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
from multiprocessing import Process


def long_running_cpu_task():
  for _ in range(100000000):
      pass


processes = []

for _ in range(10):
  processes.append(Process(target=long_running_cpu_task))

start = time.time()

for t in processes:
  t.start()

for t in processes:
  t.join()

stop = time.time()
print("Exec time for processes:", stop - start, "[s]")

Wyniki?

Exec time for threads: 21.000962018966675 [s]
Exec time for processes: 6.317837238311768 [s]

Różnica jest szokująca! A to wszystko „wina” GILa….

GIL

Czyli Global Interpreter Lock – mutex, który nie pozwala wielu wątkom używać interpretera w tym samym czasie. Brzmi niewinnie, ale pociąga za sobą konsekwencję którą można było zauważyć w poprzednim przykładzie. Upraszczając, sprawia to, że w CPythonie nie mamy pełnowartościowej wielowątkowości. Po prostu wewnętrzne struktury interpretera Pythona nie są thread-safe i pozwolenie wątkom na prawdziwą współbieżność, mogłoby je uszkodzić.

Pozostaje jedna niedopowiedziana kwestia – dlaczego w takim razie przykład z pracą na pliku działał zgodnie z „oczekiwaniami”? Odpowiedź jest prosta – GIL jest zwalniany dla operacji I/O a więc np. podczas pracy z plikami.

Słyszałem, że już jakiś czas temu były próby zastąpienia GILa większą ilością małych mutexów, ale o ironio – efekt był taki, że całość działała wolniej niż ze starym, „dobrym” GILem.

Podsumowując

Programowanie współbieżne i/lub równoległe jest nieodłączną i bardzo popularną praktyką w prawie każdym większym projekcie. Warto wiedzieć, że niesie ona za sobą ryzyko znacznego zwiększenia skomplikowania kodu, ale daje możliwość poprawy efektywności. Dla przykładu bez wątków nie byłoby programów z GUI, bo tam jeden wątek jest wydelegowany tylko do tego, by zapewnić responsywność interfejsu.

Dzięki za wizytę,
Mateusz Mazurek
Mateusz M.

Pokaż komentarze

  • Dzień dobry!

    To ja mam pytanie: wyobraźmy sobie, że mamy do przetworzenia 10 000 plików na zasadzie: otwórz, zrób coś, zamknij. Każdy plik osobno. Aż się prosi, żeby wykorzystać wielowątkowość, No teraz to pytanie - jak napisać program, żeby zawsze miał na przykład max. 5 wątków, żeby nie zarżnąć procesora a potem reszty zasobów?

    Pozdrawiam serdecznie - wierny czytelnik :)

    • Wierny czytelnik <3 takich lubię najbardziej! :D

      Problem o którym mówisz najlepiej rozwiązać pulą wątków. Jeśli to jest 10 000 różnych plików, czyli nie będzie problemów typu racecondition, to po prostu pula wątków, ona określa ile wątków ma być powołanych do życia. Jeśli pliki się powtarzają, to to trzeba będzie to synchronizować :D

  • Dzięki za super artykuł. Bardzo mi pomógł w zrozumieniu razdziału z "Zaawansowanego Pythona" Ramalho.

    Przy okazji pytanie:
    Wykonałem dwa ostatnie skrypty z wątkami i procesami i skrypt z wątkami na mojej maszynie wykonuje się ok. 10s. Natomiast ten z procesami... uwaga... najszybciej wykonuje sie przy 1 procesie range(1) w tempie ok 1s, a przy 14 procesach 1.76s:
    Masz jakieś wyjaśnienie?

    # inxi -C
    CPU:
     Info: 14-core (6-mt/8-st) model: 12th Gen Intel Core i7-12700H

  • ...chciałem oczywiście napisać "skrypt z wątkami"
    zamiast: "wątek z procesami"

Ostatnie wpisy

Podsumowanie: maj, czerwiec, lipiec i sierpień 2024

Oj daaawnoo mnie tu nie było. Ale wakacje to był czas dużej liczby intensywnych wyjazdów i tak naprawdę, dopiero jakoś… Read More

4 miesiące ago

Podsumowanie: kwiecień 2024

Cześć! Zapraszam na krótkie podsumowanie kwietnia. Wyjazd do Niemiec A dokładniej pod granicę z Francją. Chrześnica miała pierwszą komunię. Po… Read More

8 miesięcy ago

Podsumowanie: luty i marzec 2024

Ostatnio tygodnie były tak bardzo wypełnione, że nie udało mi się napisać nawet krótkiego podsumowanie. Więc dziś zbiorczo podsumuję luty… Read More

9 miesięcy ago

Podsumowanie: styczeń 2024

Zapraszam na krótkie podsumowanie miesiąca. Książki W styczniu przeczytałem "Homo Deus: Historia jutra". Książka łudząco podoba do wcześniejszej książki tego… Read More

11 miesięcy ago

Podsumowanie roku 2023

Cześć! Zapraszam na podsumowanie roku 2023. Książki Zacznijmy od książek. W tym roku cel 35 książek nie został osiągnięty. Niemniej… Read More

12 miesięcy ago

Podsumowanie: grudzień 2023

Zapraszam na krótkie podsumowanie miesiąca. Książki W grudniu skończyłem czytać Mein Kampf. Nudna książka. Ciekawsze fragmenty można by było streścić… Read More

1 rok ago