Mateusz Mazurek – programista z pasją

Python, architektura, ciekawostki ze świata IT

Algorytmika Inżynieria oprogramowania Programowanie Programowanie webowe

Redis i Python – dobrze dobrana para #6

Cześć.

Seria artykułów o Redisie i Pythonie to już zbiór 5 wpisów. Pierwsze cztery to wprowadzenie do Redisa, podstawowe sposoby użycia, trochę przydatnych informacji i przede wszystkim – podstawowe typy danych. Piąty artykuł dotyczy mechanizmu publish-subscribe. Niżej zostawiam linki, żeby osoby, które dopiero do mnie dotarły, nie zgubiły całości:

Wnikliwi czytelnicy łatwo zauważą, że Redis jak na razie dostarcza dwie możliwości przesyłania komunikatów:

  • mechanizm blokujących list
  • mechanizm publish-subscribe

Dziś pokażę trzeci sposób. Jest on dość nowym ficzerem, pojawił się w wersji 5.0 (gdzie najnowsza stabilna wersja w momencie pisania tego artykułu to 6.0.8)

Czym są strumienie?

Strumienie to typ danych który pracuje w trybie „append-only”. Znaczy to tylko tyle, że wiadomości które „wlejemy” do strumienia lądują na jego końcu. Nie możemy dodawać elementów w konkretne miejsce strumienia. W odróżnieniu od list i pub-sub, strumienie pozwalają na przesyłanie danych nieco bardziej ustrukturyzowanych niż stringi. Każda wiadomość w strumieniu ma swoje unikalne ID, generowane przez Redisa lub ustawione przez użytkownika.

Podstawowe użycie

Podstawową komendą używaną do pracy ze strumieniami są polecenia XADD i XREAD. Taki kod:

1
2
3
4
5
6
7
8
9
from redis import Redis

redis_connection = Redis(decode_responses=True, db=0)

stream_name = 'testowy_strumien'

redis_connection.xadd(stream_name, {'testowy_klucz': 'testowa_wartosc'})
message = redis_connection.xread({stream_name: '0-0'}, block=None, count=1)
print(message)

doda do strumienia słownik i odczyta z niego, podaną w parametrze count ilość elementów. Parametr block określa czy i jeśli tak, to na ile milisekund, funkcja ta ma być blokując. Umożliwia to używanie jej w pętli. Nazwa strumienia do którego się podłączamy to słownik, którego kluczem jest nazwa strumienia a wartością ID od którego chcemy dane odczytywać. W tym przypadku – od początku.

Efektem wykonania się tego programu będzie:

[['testowy_strumien', [('1599850879420-0', {'testowy_klucz': 'testowa_wartosc'})]]]

To co ważne w tym outpucie to identyfikator 1599850879420-0. Jak napisałem wcześniej, każdy element strumienia posiada swój identyfikator, jego budowa przedstawia się tak:

a przekazując do metody xadd dodatkowy parametr id, możemy ten identyfikator nadpisać. W ogromnej ilości przypadków nie będzie to konieczne.


Czekaj, stop!

Podoba Ci się to co tworzę? Jeśli tak to zapraszam Cię do zapisania się na newsletter:
a w ramach prezentu otrzymasz całkowicie za darmo, dwa dokumenty PDF „6 (nie zawsze oczywistych) błędów popełnianych podczas nauki programowania” który jest jednym z efektów ponad siedmioletniej pracy i obserwacji rozwoju niejednego programisty oraz „Wstęp do testowania w Pythonie”, będący wprowadzeniem do biblioteki PyTest.
Jeśli to Cię interesuje to zapraszam również na swoje social media.

Jak i do ewentualnego postawienia mi kawy :)
Postaw mi kawę na buycoffee.to

Trochę bardziej zaawansowane użycie

Dane ze strumienia nie są kasowane. Ma to kilka konsekwencji. Jedną z nich jest, że jeśli program z poprzedniego „rozdziału” zostanie uruchomiony np. 5 razy to zawsze będzie odczytywana ze strumienia pierwsza dodana do niego wartość, mimo że xadd będzie nowe wartości dodawał. Łatwo to potwierdzić, uruchom program np. 5 razy i potem zmień count na 5 i dostajesz w odpowiedzi 5 ostatnich elementów strumienia. Jest to zachowanie pożądane, ponieważ musimy jawnie oznaczyć wiadomość jako przetworzoną. Sprawia to że żaden niechciany exception nie spowoduje, że elementy będą nam ginąć.

Idąc dalej – nie zawsze chcemy zaczynać otrzymywać elementy ze strumienia od początku jego istnienia. Faktem jest, że w poprzednim kawałku kodu, jawnie zdefiniowaliśmy „0-0”, czyli chęć otrzymywania zawartości od początku. Wystarczy że w tym miejscu zmienimy „0-0” na „$” a będziemy otrzymywać już tylko nowe elementy.

Blokująca natura xread jest możliwa dzięki argumentowi block. Zerknij na kod:

1
2
3
4
5
6
7
8
9
from redis import Redis

redis_connection = Redis(decode_responses=True, db=0)

stream_name = 'testowy_strumien'

while True:
    message = redis_connection.xread({stream_name: '$'}, block=10, count=1)
    print(message)

i teraz, jeśli wyczyścimy nasza bazę (polecenie FLUSHALL na redis-cli) i uruchomimy ten program to dostaniemy po prostu co 10ms pustą tablicę. Korzystając z redis-cli możemy wykonać polecenie

xadd testowy_strumien * key val

I nasz program pokaże dodany element.

Ale halo. Stop! Coś tu jest nie tak. Element dodany przez CLI pojawił się tylko raz. A przecież pisałem że nie jest on usuwany ze strumienia! I tak dokładnie jest, tylko „$” pobiera elementy dodane po tym jak wywołamy blokujące xread, więc naszego już nie pokaże.

Dodatkowym elementem o którym trzeba pamiętać jest że tylko pierwsze uruchomienie xread powinno być z „$”. Kolejne powinny zawierać ID ostatnio odczytanej wiadomości. Jeśli nie zrobimy tego, to wiadomości wysłane pomiędzy jednym blokowaniem a drugim – będą nam ginąć. Kod powinien wyglądać tak:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from redis import Redis

redis_connection = Redis(decode_responses=True, db=0)

stream_name = 'testowy_strumien'

starting_point = "$"

while True:
    message = redis_connection.xread({stream_name: starting_point}, block=10, count=1)
    if message:
        message = message[0][1][0]
        msg_id = message[0]
        msg_payload = message[1]
        starting_point = msg_id
        print(msg_payload, msg_id)

No dobra, wiemy jak odbierać poprawnie elementy. Ale nadal one zostają w strumieniu. Potwierdzanie przetworzenia, bo o nim mowa, to po prostu wywołaniem metody xack. Niestety, ta metoda jest dostępna tylko w przypadku grupowania konsumentów (programów podłączonych do strumienia). Jeśli chcemy to zrobić bez grupowania zostaje nam brzydkie xdel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from redis import Redis

redis_connection = Redis(decode_responses=True, db=0)

stream_name = 'testowy_strumien'

starting_point = "$"

while True:
    message = redis_connection.xread({stream_name: starting_point}, block=10, count=1)
    if message:
        message = message[0][1][0]
        msg_id = message[0]
        msg_payload = message[1]
        starting_point = msg_id
        redis_connection.xdel(stream_name, msg_id)
        print(msg_payload, msg_id)

Grupowanie konsumentów

Tworzenie grup konsumentów to ficzer pozwalający na większą skalowalność budowanych rozwiązań. Przydatne jeśli ze strumienia czyta więcej niż jedna aplikacja, bo rozwiązuje problem dostarczania tej samej wiadomości do wielu podłączonych konsumentów. Po prostu, w ramach grupy, konsumenci dostają określoną paczkę wiadomości. Tzn że jeśli wiadomość A pójdzie do konsumenta X to Redis gwarantuje że nie pojawi się ona u konsument Y.

Podsumowując

Strumienie w Redisie to wydajne i potężne narzędzie, łączące zalety listy i mechanizmu pub-sub. Przyznam się, że o ile rozwiązania z poprzednich wpisów z tej serii widziałem w produkcyjnym użyciu, to strumieni jeszcze nie widziałem. Po niezobowiązującej zabawie, która wynikła z potrzeby tego wpisu, zachowuję strumienie Redisowe w pamięci – pewnie kiedyś po nie sięgnę.

Dzięki za wizytę,
Mateusz Mazurek

A może wolisz nowości na mail?

Subskrybuj
Powiadom o
guest

Witryna wykorzystuje Akismet, aby ograniczyć spam. Dowiedz się więcej jak przetwarzane są dane komentarzy.

2 komentarzy
Inline Feedbacks
View all comments