Mateusz Mazurek – programista z pasją

Python, architektura, ciekawostki ze świata IT

Inżynieria oprogramowania Programowanie Programowanie webowe

Wątki i procesy w Pythonie

Wstęp

Czymże byłoby programowanie bez współbieżności i równoległości. Watki i procesy to elementy budulcowe wspomnianego modelu a ich współistnienie i dostęp do współdzielonych danych, to potężna broń na drodze ku wydajności i efektywności kodu. Czasem niestety taki styl pisania programów potrafi przyprawić ich autorów o siwiznę albo co gorsza, sprawić, że w ogóle nie będą mieć włosów.

Podstawy

Wątek to osobna jednostka wykonawcza działająca w obrębie tego samego procesu. Oznacza to, że najprostsze programy zawsze mają proces i co najmniej jeden wątek (nazywany głównym). Nawet programy typu „hello world”.

Proces to taki worek na wątki. W jego obrębie może działać wiele wątków (ale nie mniej niż jeden), które współdzielą pamięć. Różne procesy mają swoją przestrzeń pamięci.

Różnica między zwykłym procesem a wątkiem polega na współdzieleniu przez wszystkie wątki działające w danym procesie przestrzeni adresowej oraz wszystkich innych struktur systemowych (np. listy otwartych plików itp.) – z kolei procesy posiadają niezależne zasoby.

Rodzi to poważne konsekwencje:

  • wątki są lżejsze, tzn praca z nimi nie dostarcza tak dużego narzutu jak w przypadku pracy z procesami
  • przekazywanie danych pomiędzy wątkami jest znacznie tańsze niż pomiędzy procesami
  • proces zapewnia separację pamięci, a to zapewnia bezpieczeństwo danych

Praca z wątkami

Praca z wątkami jest zazwyczaj realizowana na trzy sposoby:

  • poprzez użycie klasy Thread
  • poprzez dziedziczenie z klasy Thread
  • poprzez pulę wątków

Pierwszy sposób wypisania 20 liczb, każda w osobnym wątku, korzystając z klasy Thread:

1
2
3
4
5
6
7
8
9
10
11
12
from threading import Thread

threads = []

for i in range(20):
    threads.append(Thread(target=print, args=(i, )))

for t in threads:
    t.start()

for t in threads:
    t.join()

I ten kod oczywiście zadziała, wypisze te 20 liczb.

Drugi sposób to dziedziczenie i redefinicja run:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from threading import Thread


class MyThread(Thread):
    def __init__(self, i):
        super().__init__()
        self._i = i

    def run(self) -> None:
        print(self._i)


threads = []

for i in range(20):
    threads.append(MyThread(i=i))

for t in threads:
    t.start()

for t in threads:
    t.join()

I trzeci, chyba najpopularniejszy sposób, czyli użycie puli wątków, czyli określonej ilości wątków która będzie utrzymywana w celu wykonania wskazanych zadań:

1
2
3
4
5
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=20) as executor:
    for i in range(20):
        executor.submit(print, i)

I teraz pytanie. Czy każdy z tych kawałków kodu, zawsze wypisze liczby w odpowiedniej kolejności i bez problemów? No nie zawsze. Ale zerknijmy na bardziej obrazowy przykład.

Race condition

Tłumaczą na polski to po prostu „wyścig”. Popatrz na poniższy kod.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

variable: int = 1
FILENAME: str = 'test'


def read_from_file() -> int:
    with open(FILENAME, 'r+') as file:
        val = file.read()
        return int(val)


def write_to_file(val: int) -> None:
    with open(FILENAME, 'w') as file:
        file.write(str(val))


def increment_value_in_file():
    value = read_from_file()
    write_to_file(value + 1)


write_to_file(0)


with ThreadPoolExecutor(10) as executor:
    for _ in range(10):
        executor.submit(increment_value_in_file)

print("Final value: ", read_from_file())

To prosty skrypt który chce pracować na pliku. Uruchamia on 10 wątków, każdy wątek ma odczytać plik, pobrać jego zawartość, dodać do niej 1 i zapisać do pliku. Zawartość pliku na początku wynosi 0. Prostym jest, że oczekujemy na końcu wyniku 10.

A jaki wynik dostajemy?

Całkowicie niedeterministyczny! Za każdym uruchomieniem programu dostajemy inną wartość.

Problemem jest brak synchronizacji. Wątki przeplatają się odczytując i zapisując plik, co sprawia, że nic nie stoi na przeszkodzie by np. trzy wątki odczytały tę samą wartość z pliku, bo żaden inny wątek nie zdąży nic zapisać. To natomiast sprawi, że te trzy wątki będą próbowały zapisać… Tę samą wartość do pliku.

Bałagan, prawda?


Czekaj, stop!

Podoba Ci się to co tworzę? Jeśli tak to zapraszam Cię do zapisania się na newsletter:
a w ramach prezentu otrzymasz całkowicie za darmo, dwa dokumenty PDF „6 (nie zawsze oczywistych) błędów popełnianych podczas nauki programowania” który jest jednym z efektów ponad siedmioletniej pracy i obserwacji rozwoju niejednego programisty oraz „Wstęp do testowania w Pythonie”, będący wprowadzeniem do biblioteki PyTest.
Jeśli to Cię interesuje to zapraszam również na swoje social media.

Jak i do ewentualnego postawienia mi kawy :)
Postaw mi kawę na buycoffee.to

Synchronizacja dostępu do danych

W przykładzie z poprzedniego akapitu zauważyliśmy problem w dostępie do pliku. Chodzi o to, że funkcja increment_value_in_file nie jest atomowa, tzn nie traktujemy w niej sekwencji „odczytaj plik, zapisz plik” jako jedność, a to sprawia, że może się ona wykonywać równolegle w kilku wątkach.

Kategorycznie nie można tak robić.

Poprawka w kodzie jest bardzo łatwa, zajmijmy się więc najpierw jej zrozumieniem.

Aby program zaczął działać poprawnie, należy użyć blokad. A dokładniej klasy Lock z modułu threading. Instancję tej klasy możemy używać jako context managera, a więc po prostu otoczyć nią ciało funkcji increment_value_in_file. Sprawi to, że dostęp do tego kodu będzie zarezerwowany dla jednego wątku w jednym czasie. Tzn jeśli np. watek numer 3 wejdzie w kawałek kodu objęty blokadą, to żaden inny wątek nie będzie mógł zrobić tego samego, do momentu wyjścia wątku nr. 3 z tego kodu.

I czas na kod:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from threading import Thread, Lock
from concurrent.futures import ThreadPoolExecutor

variable: int = 1
FILENAME: str = 'test'

lock = Lock()


def read_from_file() -> int:
    with open(FILENAME, 'r+') as file:
        val = file.read()
        return int(val)


def write_to_file(val: int) -> None:
    with open(FILENAME, 'w') as file:
        file.write(str(val))


def increment_value_in_file():
    with lock:
        value = read_from_file()
        write_to_file(value + 1)


write_to_file(0)


with ThreadPoolExecutor(10) as executor:
    for _ in range(10):
        executor.submit(increment_value_in_file)

print("Final value: ", read_from_file())

Pytanie sprawdzające: czy kod będzie działał poprawnie, jeśli funkcję increment_value_in_file napiszemy w ten sposób:

1
2
3
4
def increment_value_in_file():
    with Lock():  
        value = read_from_file()
        write_to_file(value + 1)

A jeśli nie, to dlaczego?

Problemy tego typu to zasadnicza trudność korzystania z wątków ponieważ w dużych projektach dodanie blokady nie zawsze jest tak oczywiste jak tu. A ponad to, dodanie Lock’a sprawia, że inne wątki czekają, więc zmniejszają one wydajność całego systemu.

Procesy

Tak jak pisałem wcześniej, procesy są cięższe niż wątki ale nie dzielą między sobą pamięci, a to sprawia, że problem dostępu do tego samego elementu pamięci się rozwiązuje. Nadal jednak może wystąpić wiele innych problemów, jak np. już nam znany kłopot z dostępem do pliku.

Wersja wieloprocesowa tego podejścia mogłaby wyglądać tak:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from multiprocessing import Process

variable: int = 1
FILENAME: str = 'test'


class MyProcess(Process):
    def __init__(self, func):
        super().__init__()
        self.func = func

    def run(self) -> None:
        self.func()


def read_from_file() -> int:
    with open(FILENAME, 'r+') as file:
        val = file.read()
        return int(val)


def write_to_file(val: int) -> None:
    with open(FILENAME, 'w') as file:
        file.write(str(val))


def increment_value_in_file():
    value = read_from_file()
    write_to_file(value + 1)


write_to_file(0)

processes = []

for i in range(100):
    processes.append(MyProcess(increment_value_in_file))

for p in processes:
    p.start()

for p in processes:
    p.join()

print("Final value: ", read_from_file())

I tutaj również rozwiązaniem jest Lock. Niestety nie jest to tak proste jak w przypadku wątków, bo trzeba wymyślić jak dzielić ten obiekt pomiędzy procesami. Jeśli to zepsujemy to dostaniemy błąd

RuntimeError: Lock objects should only be shared between processes through inheritance

Poprawnie możemy zrobić to tak, że po prostu przekazujemy obiekt Locka:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from multiprocessing import Lock, Process

variable: int = 1
FILENAME: str = 'test'
process_lock = Lock()


class MyProcess(Process):
    def __init__(self, func, lock):
        super().__init__()
        self.func = func
        self.lock = lock

    def run(self) -> None:
        with self.lock:
            self.func()




def read_from_file() -> int:
    with open(FILENAME, 'r+') as file:
        val = file.read()
        return int(val)


def write_to_file(val: int) -> None:
    with open(FILENAME, 'w') as file:
        file.write(str(val))


def increment_value_in_file():
    value = read_from_file()
    write_to_file(value + 1)


write_to_file(0)

processes = []

for i in range(1000):
    processes.append(MyProcess(increment_value_in_file, process_lock))

for p in processes:
    p.start()

for p in processes:
    p.join()

print("Final value: ", read_from_file())

Które podejście jest szybsze?

W tym przypadku który sobie omawiamy, odpowiedź na to pytanie już prawdopodobnie znasz. Pomierzyłem jednak czasy i oto wyniki:

For processes:

Exec time: 0.6507244110107422 [s]
Final value:  1000

For threads:

Exec time: 0.24805140495300293 [s]
Final value:  1000

Rezultat nie jest zadziwiający. Czas potrzebny na obsługę procesów jest znacznie większy niż w przypadku wątków. Czy to znaczy, że nie warto używać multiprocessingu?

Kiedy używać wielu procesów?

Przede wszystkim wtedy kiedy dzielimy kod który intensywnie wykorzystuje CPU. Popatrz na przykład:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
from threading import Thread, Lock


def long_running_cpu_task():
    for _ in range(100000000):
        pass


threads = []

for _ in range(10):
    threads.append(Thread(target=long_running_cpu_task))

start = time.time()

for t in threads:
    t.start()

for t in threads:
    t.join()

stop = time.time()
print("Exec time for threads:", stop-start, "[s]")

I oczywiście jego odpowiednik ale z użyciem procesów:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
from multiprocessing import Process


def long_running_cpu_task():
    for _ in range(100000000):
        pass


processes = []

for _ in range(10):
    processes.append(Process(target=long_running_cpu_task))

start = time.time()

for t in processes:
    t.start()

for t in processes:
    t.join()

stop = time.time()
print("Exec time for processes:", stop - start, "[s]")

Wyniki?

Exec time for threads: 21.000962018966675 [s]
Exec time for processes: 6.317837238311768 [s]

Różnica jest szokująca! A to wszystko „wina” GILa….

GIL

Czyli Global Interpreter Lock – mutex, który nie pozwala wielu wątkom używać interpretera w tym samym czasie. Brzmi niewinnie, ale pociąga za sobą konsekwencję którą można było zauważyć w poprzednim przykładzie. Upraszczając, sprawia to, że w CPythonie nie mamy pełnowartościowej wielowątkowości. Po prostu wewnętrzne struktury interpretera Pythona nie są thread-safe i pozwolenie wątkom na prawdziwą współbieżność, mogłoby je uszkodzić.

Pozostaje jedna niedopowiedziana kwestia – dlaczego w takim razie przykład z pracą na pliku działał zgodnie z „oczekiwaniami”? Odpowiedź jest prosta – GIL jest zwalniany dla operacji I/O a więc np. podczas pracy z plikami.

Słyszałem, że już jakiś czas temu były próby zastąpienia GILa większą ilością małych mutexów, ale o ironio – efekt był taki, że całość działała wolniej niż ze starym, „dobrym” GILem.

Podsumowując

Programowanie współbieżne i/lub równoległe jest nieodłączną i bardzo popularną praktyką w prawie każdym większym projekcie. Warto wiedzieć, że niesie ona za sobą ryzyko znacznego zwiększenia skomplikowania kodu, ale daje możliwość poprawy efektywności. Dla przykładu bez wątków nie byłoby programów z GUI, bo tam jeden wątek jest wydelegowany tylko do tego, by zapewnić responsywność interfejsu.

Dzięki za wizytę,
Mateusz Mazurek

A może wolisz nowości na mail?

Subskrybuj
Powiadom o
guest

Witryna wykorzystuje Akismet, aby ograniczyć spam. Dowiedz się więcej jak przetwarzane są dane komentarzy.

9 komentarzy
Inline Feedbacks
View all comments

[…] więcej informacji o wątkach i procesach w Pythonie umieściłem w tym artykule. Sugeruję przeczytać go, zanim przejdziesz do dalszego czytania tego […]

Marek

Dzień dobry!

To ja mam pytanie: wyobraźmy sobie, że mamy do przetworzenia 10 000 plików na zasadzie: otwórz, zrób coś, zamknij. Każdy plik osobno. Aż się prosi, żeby wykorzystać wielowątkowość, No teraz to pytanie – jak napisać program, żeby zawsze miał na przykład max. 5 wątków, żeby nie zarżnąć procesora a potem reszty zasobów?

Pozdrawiam serdecznie – wierny czytelnik :)

Piotr

Dzięki za super artykuł. Bardzo mi pomógł w zrozumieniu razdziału z „Zaawansowanego Pythona” Ramalho.

Przy okazji pytanie:
Wykonałem dwa ostatnie skrypty z wątkami i procesami i skrypt z wątkami na mojej maszynie wykonuje się ok. 10s. Natomiast ten z procesami… uwaga… najszybciej wykonuje sie przy 1 procesie range(1) w tempie ok 1s, a przy 14 procesach 1.76s:
Masz jakieś wyjaśnienie?

# inxi -C
CPU:
 Info: 14-core (6-mt/8-st) model: 12th Gen Intel Core i7-12700H

Piotr

…chciałem oczywiście napisać „skrypt z wątkami”
zamiast: „wątek z procesami”

[…] Wątki i procesy w Pythonie […]

[…] Wątki i procesy w Pythonie […]