Redis i Python – dobrze dobrana para #6
Cześć.
Seria artykułów o Redisie i Pythonie to już zbiór 5 wpisów. Pierwsze cztery to wprowadzenie do Redisa, podstawowe sposoby użycia, trochę przydatnych informacji i przede wszystkim – podstawowe typy danych. Piąty artykuł dotyczy mechanizmu publish-subscribe. Niżej zostawiam linki, żeby osoby, które dopiero do mnie dotarły, nie zgubiły całości:
- https://mmazurek.dev/redis-i-python-dobrze-dobrana-para-5/
- https://mmazurek.dev/redis-i-python-dobrze-dobrana-para-4/
- https://mmazurek.dev/redis-i-python-dobrze-dobrana-para-3/
- https://mmazurek.dev/redis-i-python-dobrze-dobrana-para-2/
- https://mmazurek.dev/redis-i-python-dobrze-dobrana-para-1/
Wnikliwi czytelnicy łatwo zauważą, że Redis jak na razie dostarcza dwie możliwości przesyłania komunikatów:
- mechanizm blokujących list
- mechanizm publish-subscribe
Dziś pokażę trzeci sposób. Jest on dość nowym ficzerem, pojawił się w wersji 5.0 (gdzie najnowsza stabilna wersja w momencie pisania tego artykułu to 6.0.8)
Czym są strumienie?
Strumienie to typ danych który pracuje w trybie „append-only”. Znaczy to tylko tyle, że wiadomości które „wlejemy” do strumienia lądują na jego końcu. Nie możemy dodawać elementów w konkretne miejsce strumienia. W odróżnieniu od list i pub-sub, strumienie pozwalają na przesyłanie danych nieco bardziej ustrukturyzowanych niż stringi. Każda wiadomość w strumieniu ma swoje unikalne ID, generowane przez Redisa lub ustawione przez użytkownika.
Podstawowe użycie
Podstawową komendą używaną do pracy ze strumieniami są polecenia XADD i XREAD. Taki kod:
1 2 3 4 5 6 7 8 9 | from redis import Redis redis_connection = Redis(decode_responses=True, db=0) stream_name = 'testowy_strumien' redis_connection.xadd(stream_name, {'testowy_klucz': 'testowa_wartosc'}) message = redis_connection.xread({stream_name: '0-0'}, block=None, count=1) print(message) |
doda do strumienia słownik i odczyta z niego, podaną w parametrze count ilość elementów. Parametr block określa czy i jeśli tak, to na ile milisekund, funkcja ta ma być blokując. Umożliwia to używanie jej w pętli. Nazwa strumienia do którego się podłączamy to słownik, którego kluczem jest nazwa strumienia a wartością ID od którego chcemy dane odczytywać. W tym przypadku – od początku.
Efektem wykonania się tego programu będzie:
[['testowy_strumien', [('1599850879420-0', {'testowy_klucz': 'testowa_wartosc'})]]]
To co ważne w tym outpucie to identyfikator 1599850879420-0. Jak napisałem wcześniej, każdy element strumienia posiada swój identyfikator, jego budowa przedstawia się tak:
a przekazując do metody xadd dodatkowy parametr id, możemy ten identyfikator nadpisać. W ogromnej ilości przypadków nie będzie to konieczne.
Czekaj, stop!
Podoba Ci się to co tworzę? Jeśli tak to zapraszam Cię do zapisania się na newsletter:Jeśli to Cię interesuje to zapraszam również na swoje social media.
Jak i do ewentualnego postawienia mi kawy :)
Trochę bardziej zaawansowane użycie
Dane ze strumienia nie są kasowane. Ma to kilka konsekwencji. Jedną z nich jest, że jeśli program z poprzedniego „rozdziału” zostanie uruchomiony np. 5 razy to zawsze będzie odczytywana ze strumienia pierwsza dodana do niego wartość, mimo że xadd będzie nowe wartości dodawał. Łatwo to potwierdzić, uruchom program np. 5 razy i potem zmień count na 5 i dostajesz w odpowiedzi 5 ostatnich elementów strumienia. Jest to zachowanie pożądane, ponieważ musimy jawnie oznaczyć wiadomość jako przetworzoną. Sprawia to że żaden niechciany exception nie spowoduje, że elementy będą nam ginąć.
Idąc dalej – nie zawsze chcemy zaczynać otrzymywać elementy ze strumienia od początku jego istnienia. Faktem jest, że w poprzednim kawałku kodu, jawnie zdefiniowaliśmy „0-0”, czyli chęć otrzymywania zawartości od początku. Wystarczy że w tym miejscu zmienimy „0-0” na „$” a będziemy otrzymywać już tylko nowe elementy.
Blokująca natura xread jest możliwa dzięki argumentowi block. Zerknij na kod:
1 2 3 4 5 6 7 8 9 | from redis import Redis redis_connection = Redis(decode_responses=True, db=0) stream_name = 'testowy_strumien' while True: message = redis_connection.xread({stream_name: '$'}, block=10, count=1) print(message) |
i teraz, jeśli wyczyścimy nasza bazę (polecenie FLUSHALL na redis-cli) i uruchomimy ten program to dostaniemy po prostu co 10ms pustą tablicę. Korzystając z redis-cli możemy wykonać polecenie
xadd testowy_strumien * key val
I nasz program pokaże dodany element.
Ale halo. Stop! Coś tu jest nie tak. Element dodany przez CLI pojawił się tylko raz. A przecież pisałem że nie jest on usuwany ze strumienia! I tak dokładnie jest, tylko „$” pobiera elementy dodane po tym jak wywołamy blokujące xread, więc naszego już nie pokaże.
Dodatkowym elementem o którym trzeba pamiętać jest że tylko pierwsze uruchomienie xread powinno być z „$”. Kolejne powinny zawierać ID ostatnio odczytanej wiadomości. Jeśli nie zrobimy tego, to wiadomości wysłane pomiędzy jednym blokowaniem a drugim – będą nam ginąć. Kod powinien wyglądać tak:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | from redis import Redis redis_connection = Redis(decode_responses=True, db=0) stream_name = 'testowy_strumien' starting_point = "$" while True: message = redis_connection.xread({stream_name: starting_point}, block=10, count=1) if message: message = message[0][1][0] msg_id = message[0] msg_payload = message[1] starting_point = msg_id print(msg_payload, msg_id) |
No dobra, wiemy jak odbierać poprawnie elementy. Ale nadal one zostają w strumieniu. Potwierdzanie przetworzenia, bo o nim mowa, to po prostu wywołaniem metody xack. Niestety, ta metoda jest dostępna tylko w przypadku grupowania konsumentów (programów podłączonych do strumienia). Jeśli chcemy to zrobić bez grupowania zostaje nam brzydkie xdel:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | from redis import Redis redis_connection = Redis(decode_responses=True, db=0) stream_name = 'testowy_strumien' starting_point = "$" while True: message = redis_connection.xread({stream_name: starting_point}, block=10, count=1) if message: message = message[0][1][0] msg_id = message[0] msg_payload = message[1] starting_point = msg_id redis_connection.xdel(stream_name, msg_id) print(msg_payload, msg_id) |
Grupowanie konsumentów
Tworzenie grup konsumentów to ficzer pozwalający na większą skalowalność budowanych rozwiązań. Przydatne jeśli ze strumienia czyta więcej niż jedna aplikacja, bo rozwiązuje problem dostarczania tej samej wiadomości do wielu podłączonych konsumentów. Po prostu, w ramach grupy, konsumenci dostają określoną paczkę wiadomości. Tzn że jeśli wiadomość A pójdzie do konsumenta X to Redis gwarantuje że nie pojawi się ona u konsument Y.
Podsumowując
Strumienie w Redisie to wydajne i potężne narzędzie, łączące zalety listy i mechanizmu pub-sub. Przyznam się, że o ile rozwiązania z poprzednich wpisów z tej serii widziałem w produkcyjnym użyciu, to strumieni jeszcze nie widziałem. Po niezobowiązującej zabawie, która wynikła z potrzeby tego wpisu, zachowuję strumienie Redisowe w pamięci – pewnie kiedyś po nie sięgnę.
Mateusz Mazurek