Wstęp
Czymże byłoby programowanie bez współbieżności i równoległości. Watki i procesy to elementy budulcowe wspomnianego modelu a ich współistnienie i dostęp do współdzielonych danych, to potężna broń na drodze ku wydajności i efektywności kodu. Czasem niestety taki styl pisania programów potrafi przyprawić ich autorów o siwiznę albo co gorsza, sprawić, że w ogóle nie będą mieć włosów.
Podstawy
Wątek to osobna jednostka wykonawcza działająca w obrębie tego samego procesu. Oznacza to, że najprostsze programy zawsze mają proces i co najmniej jeden wątek (nazywany głównym). Nawet programy typu „hello world”.
Proces to taki worek na wątki. W jego obrębie może działać wiele wątków (ale nie mniej niż jeden), które współdzielą pamięć. Różne procesy mają swoją przestrzeń pamięci.
Różnica między zwykłym procesem a wątkiem polega na współdzieleniu przez wszystkie wątki działające w danym procesie przestrzeni adresowej oraz wszystkich innych struktur systemowych (np. listy otwartych plików itp.) – z kolei procesy posiadają niezależne zasoby.
Rodzi to poważne konsekwencje:
- wątki są lżejsze, tzn praca z nimi nie dostarcza tak dużego narzutu jak w przypadku pracy z procesami
- przekazywanie danych pomiędzy wątkami jest znacznie tańsze niż pomiędzy procesami
- proces zapewnia separację pamięci, a to zapewnia bezpieczeństwo danych
Praca z wątkami
Praca z wątkami jest zazwyczaj realizowana na trzy sposoby:
- poprzez użycie klasy Thread
- poprzez dziedziczenie z klasy Thread
- poprzez pulę wątków
Pierwszy sposób wypisania 20 liczb, każda w osobnym wątku, korzystając z klasy Thread:
1 2 3 4 5 6 7 8 9 10 11 12 | from threading import Thread threads = [] for i in range(20): threads.append(Thread(target=print, args=(i, ))) for t in threads: t.start() for t in threads: t.join() |
I ten kod oczywiście zadziała, wypisze te 20 liczb.
Drugi sposób to dziedziczenie i redefinicja run:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | from threading import Thread class MyThread(Thread): def __init__(self, i): super().__init__() self._i = i def run(self) -> None: print(self._i) threads = [] for i in range(20): threads.append(MyThread(i=i)) for t in threads: t.start() for t in threads: t.join() |
I trzeci, chyba najpopularniejszy sposób, czyli użycie puli wątków, czyli określonej ilości wątków która będzie utrzymywana w celu wykonania wskazanych zadań:
1 2 3 4 5 | from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=20) as executor: for i in range(20): executor.submit(print, i) |
I teraz pytanie. Czy każdy z tych kawałków kodu, zawsze wypisze liczby w odpowiedniej kolejności i bez problemów? No nie zawsze. Ale zerknijmy na bardziej obrazowy przykład.
Race condition
Tłumaczą na polski to po prostu „wyścig”. Popatrz na poniższy kod.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | from threading import Thread from concurrent.futures import ThreadPoolExecutor variable: int = 1 FILENAME: str = 'test' def read_from_file() -> int: with open(FILENAME, 'r+') as file: val = file.read() return int(val) def write_to_file(val: int) -> None: with open(FILENAME, 'w') as file: file.write(str(val)) def increment_value_in_file(): value = read_from_file() write_to_file(value + 1) write_to_file(0) with ThreadPoolExecutor(10) as executor: for _ in range(10): executor.submit(increment_value_in_file) print("Final value: ", read_from_file()) |
To prosty skrypt który chce pracować na pliku. Uruchamia on 10 wątków, każdy wątek ma odczytać plik, pobrać jego zawartość, dodać do niej 1 i zapisać do pliku. Zawartość pliku na początku wynosi 0. Prostym jest, że oczekujemy na końcu wyniku 10.
A jaki wynik dostajemy?
Całkowicie niedeterministyczny! Za każdym uruchomieniem programu dostajemy inną wartość.
Problemem jest brak synchronizacji. Wątki przeplatają się odczytując i zapisując plik, co sprawia, że nic nie stoi na przeszkodzie by np. trzy wątki odczytały tę samą wartość z pliku, bo żaden inny wątek nie zdąży nic zapisać. To natomiast sprawi, że te trzy wątki będą próbowały zapisać… Tę samą wartość do pliku.
Bałagan, prawda?
Czekaj, stop!
Podoba Ci się to co tworzę? Jeśli tak to zapraszam Cię do zapisania się na newsletter:Jeśli to Cię interesuje to zapraszam również na swoje social media.
Jak i do ewentualnego postawienia mi kawy :)
Synchronizacja dostępu do danych
W przykładzie z poprzedniego akapitu zauważyliśmy problem w dostępie do pliku. Chodzi o to, że funkcja increment_value_in_file nie jest atomowa, tzn nie traktujemy w niej sekwencji „odczytaj plik, zapisz plik” jako jedność, a to sprawia, że może się ona wykonywać równolegle w kilku wątkach.
Kategorycznie nie można tak robić.
Poprawka w kodzie jest bardzo łatwa, zajmijmy się więc najpierw jej zrozumieniem.
Aby program zaczął działać poprawnie, należy użyć blokad. A dokładniej klasy Lock z modułu threading. Instancję tej klasy możemy używać jako context managera, a więc po prostu otoczyć nią ciało funkcji increment_value_in_file. Sprawi to, że dostęp do tego kodu będzie zarezerwowany dla jednego wątku w jednym czasie. Tzn jeśli np. watek numer 3 wejdzie w kawałek kodu objęty blokadą, to żaden inny wątek nie będzie mógł zrobić tego samego, do momentu wyjścia wątku nr. 3 z tego kodu.
I czas na kod:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | from threading import Thread, Lock from concurrent.futures import ThreadPoolExecutor variable: int = 1 FILENAME: str = 'test' lock = Lock() def read_from_file() -> int: with open(FILENAME, 'r+') as file: val = file.read() return int(val) def write_to_file(val: int) -> None: with open(FILENAME, 'w') as file: file.write(str(val)) def increment_value_in_file(): with lock: value = read_from_file() write_to_file(value + 1) write_to_file(0) with ThreadPoolExecutor(10) as executor: for _ in range(10): executor.submit(increment_value_in_file) print("Final value: ", read_from_file()) |
Pytanie sprawdzające: czy kod będzie działał poprawnie, jeśli funkcję increment_value_in_file napiszemy w ten sposób:
1 2 3 4 | def increment_value_in_file(): with Lock(): value = read_from_file() write_to_file(value + 1) |
A jeśli nie, to dlaczego?
Problemy tego typu to zasadnicza trudność korzystania z wątków ponieważ w dużych projektach dodanie blokady nie zawsze jest tak oczywiste jak tu. A ponad to, dodanie Lock’a sprawia, że inne wątki czekają, więc zmniejszają one wydajność całego systemu.
Procesy
Tak jak pisałem wcześniej, procesy są cięższe niż wątki ale nie dzielą między sobą pamięci, a to sprawia, że problem dostępu do tego samego elementu pamięci się rozwiązuje. Nadal jednak może wystąpić wiele innych problemów, jak np. już nam znany kłopot z dostępem do pliku.
Wersja wieloprocesowa tego podejścia mogłaby wyglądać tak:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | from multiprocessing import Process variable: int = 1 FILENAME: str = 'test' class MyProcess(Process): def __init__(self, func): super().__init__() self.func = func def run(self) -> None: self.func() def read_from_file() -> int: with open(FILENAME, 'r+') as file: val = file.read() return int(val) def write_to_file(val: int) -> None: with open(FILENAME, 'w') as file: file.write(str(val)) def increment_value_in_file(): value = read_from_file() write_to_file(value + 1) write_to_file(0) processes = [] for i in range(100): processes.append(MyProcess(increment_value_in_file)) for p in processes: p.start() for p in processes: p.join() print("Final value: ", read_from_file()) |
I tutaj również rozwiązaniem jest Lock. Niestety nie jest to tak proste jak w przypadku wątków, bo trzeba wymyślić jak dzielić ten obiekt pomiędzy procesami. Jeśli to zepsujemy to dostaniemy błąd
RuntimeError: Lock objects should only be shared between processes through inheritance
Poprawnie możemy zrobić to tak, że po prostu przekazujemy obiekt Locka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | from multiprocessing import Lock, Process variable: int = 1 FILENAME: str = 'test' process_lock = Lock() class MyProcess(Process): def __init__(self, func, lock): super().__init__() self.func = func self.lock = lock def run(self) -> None: with self.lock: self.func() def read_from_file() -> int: with open(FILENAME, 'r+') as file: val = file.read() return int(val) def write_to_file(val: int) -> None: with open(FILENAME, 'w') as file: file.write(str(val)) def increment_value_in_file(): value = read_from_file() write_to_file(value + 1) write_to_file(0) processes = [] for i in range(1000): processes.append(MyProcess(increment_value_in_file, process_lock)) for p in processes: p.start() for p in processes: p.join() print("Final value: ", read_from_file()) |
Które podejście jest szybsze?
W tym przypadku który sobie omawiamy, odpowiedź na to pytanie już prawdopodobnie znasz. Pomierzyłem jednak czasy i oto wyniki:
For processes: Exec time: 0.6507244110107422 [s] Final value: 1000 For threads: Exec time: 0.24805140495300293 [s] Final value: 1000
Rezultat nie jest zadziwiający. Czas potrzebny na obsługę procesów jest znacznie większy niż w przypadku wątków. Czy to znaczy, że nie warto używać multiprocessingu?
Kiedy używać wielu procesów?
Przede wszystkim wtedy kiedy dzielimy kod który intensywnie wykorzystuje CPU. Popatrz na przykład:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | import time from threading import Thread, Lock def long_running_cpu_task(): for _ in range(100000000): pass threads = [] for _ in range(10): threads.append(Thread(target=long_running_cpu_task)) start = time.time() for t in threads: t.start() for t in threads: t.join() stop = time.time() print("Exec time for threads:", stop-start, "[s]") |
I oczywiście jego odpowiednik ale z użyciem procesów:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | import time from multiprocessing import Process def long_running_cpu_task(): for _ in range(100000000): pass processes = [] for _ in range(10): processes.append(Process(target=long_running_cpu_task)) start = time.time() for t in processes: t.start() for t in processes: t.join() stop = time.time() print("Exec time for processes:", stop - start, "[s]") |
Wyniki?
Exec time for threads: 21.000962018966675 [s] Exec time for processes: 6.317837238311768 [s]
Różnica jest szokująca! A to wszystko „wina” GILa….
GIL
Czyli Global Interpreter Lock – mutex, który nie pozwala wielu wątkom używać interpretera w tym samym czasie. Brzmi niewinnie, ale pociąga za sobą konsekwencję którą można było zauważyć w poprzednim przykładzie. Upraszczając, sprawia to, że w CPythonie nie mamy pełnowartościowej wielowątkowości. Po prostu wewnętrzne struktury interpretera Pythona nie są thread-safe i pozwolenie wątkom na prawdziwą współbieżność, mogłoby je uszkodzić.
Pozostaje jedna niedopowiedziana kwestia – dlaczego w takim razie przykład z pracą na pliku działał zgodnie z „oczekiwaniami”? Odpowiedź jest prosta – GIL jest zwalniany dla operacji I/O a więc np. podczas pracy z plikami.
Słyszałem, że już jakiś czas temu były próby zastąpienia GILa większą ilością małych mutexów, ale o ironio – efekt był taki, że całość działała wolniej niż ze starym, „dobrym” GILem.
Podsumowując
Programowanie współbieżne i/lub równoległe jest nieodłączną i bardzo popularną praktyką w prawie każdym większym projekcie. Warto wiedzieć, że niesie ona za sobą ryzyko znacznego zwiększenia skomplikowania kodu, ale daje możliwość poprawy efektywności. Dla przykładu bez wątków nie byłoby programów z GUI, bo tam jeden wątek jest wydelegowany tylko do tego, by zapewnić responsywność interfejsu.
Mateusz Mazurek
[…] więcej informacji o wątkach i procesach w Pythonie umieściłem w tym artykule. Sugeruję przeczytać go, zanim przejdziesz do dalszego czytania tego […]
Dzień dobry!
To ja mam pytanie: wyobraźmy sobie, że mamy do przetworzenia 10 000 plików na zasadzie: otwórz, zrób coś, zamknij. Każdy plik osobno. Aż się prosi, żeby wykorzystać wielowątkowość, No teraz to pytanie – jak napisać program, żeby zawsze miał na przykład max. 5 wątków, żeby nie zarżnąć procesora a potem reszty zasobów?
Pozdrawiam serdecznie – wierny czytelnik :)
Wierny czytelnik <3 takich lubię najbardziej! :D
Problem o którym mówisz najlepiej rozwiązać pulą wątków. Jeśli to jest 10 000 różnych plików, czyli nie będzie problemów typu racecondition, to po prostu pula wątków, ona określa ile wątków ma być powołanych do życia. Jeśli pliki się powtarzają, to to trzeba będzie to synchronizować :D
Dzięki za super artykuł. Bardzo mi pomógł w zrozumieniu razdziału z „Zaawansowanego Pythona” Ramalho.
Przy okazji pytanie:
Wykonałem dwa ostatnie skrypty z wątkami i procesami i skrypt z wątkami na mojej maszynie wykonuje się ok. 10s. Natomiast ten z procesami… uwaga… najszybciej wykonuje sie przy 1 procesie range(1) w tempie ok 1s, a przy 14 procesach 1.76s:
Masz jakieś wyjaśnienie?
# inxi -C
CPU:
Info: 14-core (6-mt/8-st) model: 12th Gen Intel Core i7-12700H
Wydaje mi się, że ta dodatkowe 3/4 sekundy to efekt tego, że procesy są dość ciężkie i ich stworzenie trwa. Możesz to spróbować potwierdzić/obalić korzystając z jakiegoś profilera np z yappi https://mmazurek.dev/profilowanie-pythona-z-yappi/ :)
…chciałem oczywiście napisać „skrypt z wątkami”
zamiast: „wątek z procesami”
Poprawiłem Twój powyższy komentarz:)
[…] Wątki i procesy w Pythonie […]
[…] Wątki i procesy w Pythonie […]