ZeroMQ – sposób na komunikację międzyprocesową
Cześć,
ostatnio sporo się dzieje więc blog zostaje w tyle, ale pomysł na wpis, który od dawna już istnieje – dziś doczekał się realizacji :)
W dobie kiedy to skalowanie aplikacji wykonuje się najczęściej horyzontalnie (dokładając nowe, samodzielne jednostki do istniejącej infrastruktury), bardzo dużą popularność zyskała idea mikroserwisów, czyli podejścia które mówi żeby dzielić dostarczaną usługę na wiele małych usług i pisać małe, specjalizujące się w jednej rzeczy, programy, tak by dało się łatwo zarządzać taką infrastrukturą. Całkiem nieźle przedstawia to poniższy rysunek:
Gdzie pojedyncze koła zębate to oddzielne programy pracujące na całość większej usługi. Podejście takie ma wiele zalet, poza już wspomnianą przeze mnie, łatwością zarządzania, przez łatwiejsze debuggowanie, możliwość zapanowania nad awariami, możliwość wyłączania konkretnych funkcjonalności czy chociażby czytelnością kodu (im mniej go tym zazwyczaj lepiej).
Ten rysunek wyżej ma, poza oczywiście kołami zębatymi (konkretnymi procesami) jedną niesamowicie istotną cechę. Zerknij na niego, proszę, jeszcze raz i zauważ że koła zębate są połączone ze sobą. Zębami. Co pozwala na to by ruch pierwszego koła wprawił w ruch całą przekładnię zębatą.
Podobnie jest z budowanie infrastruktury procesowej – jeden proces musi komunikować się jakoś z innymi by móc dostarczyć jakąś określoną funkcjonalność.
Kończąc ten przydługi wstęp, chcę powiedzieć iż właśnie owa komunikacja międzyprocesowa będzie tematem tego wpisu.
Rozwiązanie problemu komunikacji może być framework zwany ZeroMQ. MQ jest od Message Queue czyli od kolejki komunikatów. Zero natomiast wskazuje na minimalizm tej usługi.
Biblioteka ta posiada swojego „konektory” do wieeeeelu języków, spis jest tutaj – klik – nie widzę sensu tego wklejać tu – i jak widać raczej problem „egzotycznego języka” odpada ;)
A wiec w dużym skrócie ZeroMQ pozwala na wysłanie jakiejś wiadomości z procesu A do procesu B. Z tym że sposób w jaki można to zrobić jest wiele. Proces B może być grupą procesów, więc wtedy mamy multicast – czyli jeden do wielu. Jeśli A jest również grupą procesów to mamy wiele do wielu. Tak czy siak – możliwości jest wiele. ZeroMQ pozwala na stosowania takich modeli:
Request/Reply Pattern: Ten model działa troszkę na zasadzie protokołu HTTP – klient wysyła REQUEST a serwer zwraca RESPONSE. Jeden do jednego.
Publish/Subscribe Pattern: Jeden proces pisze na „kanał” a inne procesy odbierają wiadomości. Procesy odbierające subskrybują sie z konkretnym patternem, a więc mogą filtrować wiadomości jakie do nich przychodzą. Jeśli proces piszący (PUB) nie ma zasubskrybowanych procesów (SUB) to wszystkie wiadomości jakie wysyła – przepadają. Wiele do wielu.
Pipeline Pattern (PUSH/PULL): Podobne do podejścia PUB/SUB, z tym że tutaj procesy odbierające nie mogą się patternować i wiadomości docierają w trybie karuzelowym.
Exclusive Pair Pattern: Połączenie jeden do jednego przypominające TCP (dwukierunkowe).
Teraz jeszcze należy określić jakim transportem nasze wiadomości będą wysyłane, do wyboru mamy:
In-Process (INPROC): Transport wewnątrz procesu (pomiędzy wątkami).
Inter-Process (IPC): Transport pomiędzy procesami w obrębie tej samej maszyny.
TCP: Komunikacja przez sieć – TCP.
PGM: Protokół multicastowy dla komunikacji przez sieć.
Oczywiście siła ZeroMQ jest łączenie konkretnych modeli z konkretnymi transportami, tak żeby uzyskać oczekiwaną strukturę sieci. Ponad to ZeroMQ posiada narzędzia takie jak:
Forwarder – może służyć jako proxy – łączyć i kolejkować wiele PUB’ów z wieloma SUB’ami.
Queue – element pośredniczący pomiędzy REQUEST’em a RESPONSE’em. Kolejkuje wiadomości.
Streamer – element pośredniczący pomiędzy PUSH’em a PULL’em – również kolejkuje wiadomości.
A więc czas na przykład. Napiszmy prosty program który w obrębie tej samej maszyny będzie wysyłał do innego procesu liczby losowe od 1 do 3 w odstępnie co 3 sekundy. Procesy odbierające będą przyjmowały za parametr liczbę na którą mają nasłuchiwać.
A więc producent:
1 2 3 4 5 6 7 8 9 10 11 12 | import zmq from time import sleep from random import randint context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("ipc:///test.matt") while True: sleep(3) socket.send_string(str(randint(1,3))) |
I konsument:
1 2 3 4 5 6 7 8 9 10 11 12 | import sys import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("ipc:///test.matt") socket.setsockopt_string(zmq.SUBSCRIBE, sys.argv[1]) while True: string = socket.recv_string() print(string) |
A więc wywołanie producenta oraz dwóch konsumentów z parametrami, jeden „1” a drugi „2” pozwoli i na odbieranie tylko i wyłączenie wiadomości zaczynających się od „1” lub „2”. Jest to wykonane za pomocą socket.setsockopt_string.
Połączenie w obu skryptach jest realizowane za pomocą funkcji connect/bind która przyjmuje jako argument protokół jaki wybraliśmy i ścieżkę, w przypadku IPC, transport ten jest realizowany jako UDS (Unix Domain Socket) i /test.matt to po prostu ścieżka do niego. Przykładowo jeśli chcielibyśmy wyjść z naszym demonem poza lokalną maszynę należałoby skorzystać np. z TCP i argument wtedy by wyglądał np. tak: tcp://0.0.0.0:5000 – co uruchomi socket na porcie 5000 na wszystkich interfejsach.
Może teraz zmieńmy język – zobaczmy jak to wygląda w przypadku połączenia PHP i Pythona dla modelu PUSH/PULL.
Proces PHPa będzie producentem „zadań” które będzie PUSH’ować do workerów (procesów Pythona) które natomiast rezultat będą przesyłać (PUSH) do innego procesu PHPa. A więc procesy Pythona będą jednocześnie pisać i czytać wiadomości, schemat poglądowy wygląda tak:
No więc, przechodząc do kodu, zacznijmy od producenta:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | <?php $context = new ZMQContext(); $sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH); $sender->bind("ipc:///receive.matt"); $i = 0; while (true) { $sender->send($i); $i++; echo "Msg sent!\n"; sleep(2); } |
Worker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | import zmq from sys import argv context = zmq.Context() receiver = context.socket(zmq.PULL) sender = context.socket(zmq.PUSH) receiver.connect("ipc:///receive.matt") sender.connect("ipc:///results.matt") print("Listening..\n") def process(message): value = int(message) return {'worker': argv[1], 'result':value*value} while True: message_to_process = receiver.recv_string() print("Got msg!") result = process(message_to_process) sender.send_json(result) print("Task completed!\n") |
I program zbierający dane:
1 2 3 4 5 6 7 8 9 10 11 12 13 | <?php $context = new ZMQContext(); $rec = new ZMQSocket($context, ZMQ::SOCKET_PULL); $rec->bind("ipc:///results.matt"); $i = 0; while (true) { $result = $rec->recv(); $obj = json_decode($result); echo "Yay! We've got a result.. \nFrom ".$obj->worker." with value: ".$obj->result."\n"; } |
A więc wynikiem na konsoli producenta będzie ciąg „Msg sent!” ;) Workery będą podnosić do potęgi dostarczone im liczby w strategii karuzelowej a więc można powiedzieć że ten model komunikacji może służyć jako load ballancer, chodź jest to może nieco zbyt hucznie nazwane. Program kolekcjonujący dane będzie wypisywał dostarczony kwadrat liczby i nazwę workera od którego ją dostał, a więc np:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | Yay! We've got a result.. From Pierwszy with value: 25 Yay! We've got a result.. From Drugi with value: 36 Yay! We've got a result.. From Pierwszy with value: 49 Yay! We've got a result.. From Drugi with value: 64 Yay! We've got a result.. From Pierwszy with value: 81 Yay! We've got a result.. From Drugi with value: 100 Yay! We've got a result.. From Pierwszy with value: 121 Yay! We've got a result.. From Drugi with value: 144 |
Co też jest dowodem na działanie strategii karuzelowej. Główną różnicą między PUB/SUB a PUSH/PULL jest to, że PUSH wysyła wiadomości właśnie w tej strategii, natomiast PUB/SUB wysyła wiadomość do wszystkich zasubskrybowanych procesów.
Powoli kończąc wpis, chce dodać że jeśli wykorzystamy supervisor’a o którym pisałem w tym poście – link i możliwości ZeroMQ, jesteśmy w stanie zbudować prawie nieograniczone środowisko produkcyjne. Przez nieograniczone mam na myśli że o dowolnym stopniu skomplikowania, wykorzystując przy tym wszystkie zalety mikroserwisów. Aby ewentualnie rozbudować taką infrastrukturę można by pokusić się monitoring usług systemowych, tj samego supervisora czy chociażby jakiegoś serwera HTTP – można do tego użyć chociażby Zabbixa.
Jeśli macie jakieś inne, fajne rozwiązanie, podzielcie się z nimi, proszę, w komentarzu ;) chętni się zapoznam.
Mateusz Mazurek