Redis i Python – dobrze dobrana para #6

Cześć.

Seria artykułów o Redisie i Pythonie to już zbiór 5 wpisów. Pierwsze cztery to wprowadzenie do Redisa, podstawowe sposoby użycia, trochę przydatnych informacji i przede wszystkim – podstawowe typy danych. Piąty artykuł dotyczy mechanizmu publish-subscribe. Niżej zostawiam linki, żeby osoby, które dopiero do mnie dotarły, nie zgubiły całości:

Wnikliwi czytelnicy łatwo zauważą, że Redis jak na razie dostarcza dwie możliwości przesyłania komunikatów:

  • mechanizm blokujących list
  • mechanizm publish-subscribe

Dziś pokażę trzeci sposób. Jest on dość nowym ficzerem, pojawił się w wersji 5.0 (gdzie najnowsza stabilna wersja w momencie pisania tego artykułu to 6.0.8)

Czym są strumienie?

Strumienie to typ danych który pracuje w trybie „append-only”. Znaczy to tylko tyle, że wiadomości które „wlejemy” do strumienia lądują na jego końcu. Nie możemy dodawać elementów w konkretne miejsce strumienia. W odróżnieniu od list i pub-sub, strumienie pozwalają na przesyłanie danych nieco bardziej ustrukturyzowanych niż stringi. Każda wiadomość w strumieniu ma swoje unikalne ID, generowane przez Redisa lub ustawione przez użytkownika.

Podstawowe użycie

Podstawową komendą używaną do pracy ze strumieniami są polecenia XADD i XREAD. Taki kod:

1
2
3
4
5
6
7
8
9
from redis import Redis

redis_connection = Redis(decode_responses=True, db=0)

stream_name = 'testowy_strumien'

redis_connection.xadd(stream_name, {'testowy_klucz': 'testowa_wartosc'})
message = redis_connection.xread({stream_name: '0-0'}, block=None, count=1)
print(message)

doda do strumienia słownik i odczyta z niego, podaną w parametrze count ilość elementów. Parametr block określa czy i jeśli tak, to na ile milisekund, funkcja ta ma być blokując. Umożliwia to używanie jej w pętli. Nazwa strumienia do którego się podłączamy to słownik, którego kluczem jest nazwa strumienia a wartością ID od którego chcemy dane odczytywać. W tym przypadku – od początku.

Efektem wykonania się tego programu będzie:

[['testowy_strumien', [('1599850879420-0', {'testowy_klucz': 'testowa_wartosc'})]]]

To co ważne w tym outpucie to identyfikator 1599850879420-0. Jak napisałem wcześniej, każdy element strumienia posiada swój identyfikator, jego budowa przedstawia się tak:

a przekazując do metody xadd dodatkowy parametr id, możemy ten identyfikator nadpisać. W ogromnej ilości przypadków nie będzie to konieczne.


Czekaj, stop!

Podoba Ci się to co tworzę? Jeśli tak to zapraszam Cię do zapisania się na newsletter:

Aby potwierdzić swoją subskrypcję, odbierz pocztę i kliknij w link potwierdzający:) jeśli maila nie ma to poczekaj chwile i/lub sprawdź folder spam/inne/oferty itp :)

Aby potwierdzić swoją subskrypcję, odbierz pocztę i kliknij w link potwierdzający:) jeśli maila nie ma to poczekaj chwile i/lub sprawdź folder spam/inne/oferty itp :)
a w ramach prezentu otrzymasz całkowicie za darmo, dwa dokumenty PDF „6 (nie zawsze oczywistych) błędów popełnianych podczas nauki programowania” który jest jednym z efektów ponad siedmioletniej pracy i obserwacji rozwoju niejednego programisty oraz „Wstęp do testowania w Pythonie”, będący wprowadzeniem do biblioteki PyTest.
Jeśli to Cię interesuje to zapraszam również na swoje social media.

Jak i do ewentualnego postawienia mi kawy :)

Trochę bardziej zaawansowane użycie

Dane ze strumienia nie są kasowane. Ma to kilka konsekwencji. Jedną z nich jest, że jeśli program z poprzedniego „rozdziału” zostanie uruchomiony np. 5 razy to zawsze będzie odczytywana ze strumienia pierwsza dodana do niego wartość, mimo że xadd będzie nowe wartości dodawał. Łatwo to potwierdzić, uruchom program np. 5 razy i potem zmień count na 5 i dostajesz w odpowiedzi 5 ostatnich elementów strumienia. Jest to zachowanie pożądane, ponieważ musimy jawnie oznaczyć wiadomość jako przetworzoną. Sprawia to że żaden niechciany exception nie spowoduje, że elementy będą nam ginąć.

Idąc dalej – nie zawsze chcemy zaczynać otrzymywać elementy ze strumienia od początku jego istnienia. Faktem jest, że w poprzednim kawałku kodu, jawnie zdefiniowaliśmy „0-0”, czyli chęć otrzymywania zawartości od początku. Wystarczy że w tym miejscu zmienimy „0-0” na „$” a będziemy otrzymywać już tylko nowe elementy.

Blokująca natura xread jest możliwa dzięki argumentowi block. Zerknij na kod:

1
2
3
4
5
6
7
8
9
from redis import Redis

redis_connection = Redis(decode_responses=True, db=0)

stream_name = 'testowy_strumien'

while True:
  message = redis_connection.xread({stream_name: '$'}, block=10, count=1)
  print(message)

i teraz, jeśli wyczyścimy nasza bazę (polecenie FLUSHALL na redis-cli) i uruchomimy ten program to dostaniemy po prostu co 10ms pustą tablicę. Korzystając z redis-cli możemy wykonać polecenie

xadd testowy_strumien * key val

I nasz program pokaże dodany element.

Ale halo. Stop! Coś tu jest nie tak. Element dodany przez CLI pojawił się tylko raz. A przecież pisałem że nie jest on usuwany ze strumienia! I tak dokładnie jest, tylko „$” pobiera elementy dodane po tym jak wywołamy blokujące xread, więc naszego już nie pokaże.

Dodatkowym elementem o którym trzeba pamiętać jest że tylko pierwsze uruchomienie xread powinno być z „$”. Kolejne powinny zawierać ID ostatnio odczytanej wiadomości. Jeśli nie zrobimy tego, to wiadomości wysłane pomiędzy jednym blokowaniem a drugim – będą nam ginąć. Kod powinien wyglądać tak:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from redis import Redis

redis_connection = Redis(decode_responses=True, db=0)

stream_name = 'testowy_strumien'

starting_point = "$"

while True:
  message = redis_connection.xread({stream_name: starting_point}, block=10, count=1)
  if message:
      message = message[0][1][0]
      msg_id = message[0]
      msg_payload = message[1]
      starting_point = msg_id
      print(msg_payload, msg_id)

No dobra, wiemy jak odbierać poprawnie elementy. Ale nadal one zostają w strumieniu. Potwierdzanie przetworzenia, bo o nim mowa, to po prostu wywołaniem metody xack. Niestety, ta metoda jest dostępna tylko w przypadku grupowania konsumentów (programów podłączonych do strumienia). Jeśli chcemy to zrobić bez grupowania zostaje nam brzydkie xdel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from redis import Redis

redis_connection = Redis(decode_responses=True, db=0)

stream_name = 'testowy_strumien'

starting_point = "$"

while True:
  message = redis_connection.xread({stream_name: starting_point}, block=10, count=1)
  if message:
      message = message[0][1][0]
      msg_id = message[0]
      msg_payload = message[1]
      starting_point = msg_id
      redis_connection.xdel(stream_name, msg_id)
      print(msg_payload, msg_id)

Grupowanie konsumentów

Tworzenie grup konsumentów to ficzer pozwalający na większą skalowalność budowanych rozwiązań. Przydatne jeśli ze strumienia czyta więcej niż jedna aplikacja, bo rozwiązuje problem dostarczania tej samej wiadomości do wielu podłączonych konsumentów. Po prostu, w ramach grupy, konsumenci dostają określoną paczkę wiadomości. Tzn że jeśli wiadomość A pójdzie do konsumenta X to Redis gwarantuje że nie pojawi się ona u konsument Y.

Podsumowując

Strumienie w Redisie to wydajne i potężne narzędzie, łączące zalety listy i mechanizmu pub-sub. Przyznam się, że o ile rozwiązania z poprzednich wpisów z tej serii widziałem w produkcyjnym użyciu, to strumieni jeszcze nie widziałem. Po niezobowiązującej zabawie, która wynikła z potrzeby tego wpisu, zachowuję strumienie Redisowe w pamięci – pewnie kiedyś po nie sięgnę.

Dzięki za wizytę,
Mateusz Mazurek
Mateusz M.

Ostatnie wpisy

Podsumowanie: maj, czerwiec, lipiec i sierpień 2024

Oj daaawnoo mnie tu nie było. Ale wakacje to był czas dużej liczby intensywnych wyjazdów i tak naprawdę, dopiero jakoś… Read More

2 miesiące ago

Podsumowanie: kwiecień 2024

Cześć! Zapraszam na krótkie podsumowanie kwietnia. Wyjazd do Niemiec A dokładniej pod granicę z Francją. Chrześnica miała pierwszą komunię. Po… Read More

6 miesięcy ago

Podsumowanie: luty i marzec 2024

Ostatnio tygodnie były tak bardzo wypełnione, że nie udało mi się napisać nawet krótkiego podsumowanie. Więc dziś zbiorczo podsumuję luty… Read More

7 miesięcy ago

Podsumowanie: styczeń 2024

Zapraszam na krótkie podsumowanie miesiąca. Książki W styczniu przeczytałem "Homo Deus: Historia jutra". Książka łudząco podoba do wcześniejszej książki tego… Read More

9 miesięcy ago

Podsumowanie roku 2023

Cześć! Zapraszam na podsumowanie roku 2023. Książki Zacznijmy od książek. W tym roku cel 35 książek nie został osiągnięty. Niemniej… Read More

10 miesięcy ago

Podsumowanie: grudzień 2023

Zapraszam na krótkie podsumowanie miesiąca. Książki W grudniu skończyłem czytać Mein Kampf. Nudna książka. Ciekawsze fragmenty można by było streścić… Read More

10 miesięcy ago