Przesyłanie plików przez WebSocket z backendem w asyncio
Heloł:)
Artykuł ten jest trochę kontynuacją/rozszerzeniem tego wpisu – link – więc zanim przejdziesz dalej – sugeruję przeczytać go, bo wprowadza w temat problemu który rozwiązujemy. Oczywiście część informacji powtórzę, ale jeśli masz czas – to warto tam zajrzeć.
Skoro organizacyjne rzeczy mamy za sobą, czas przejść do sedna.
Skąd pomysł na ten artykuł?
Wpis który zlinkowałem wyżej pokazuje jak możemy napisać usługę która wykorzystując technologię WebSocket pozwala na upload pliku na serwer + sprawdzenie pliku programem antywirusowym i odesłanie statusu. Wartością dodaną jest oczywiście stworzenie własnego protokołu „on top of WS”.
I jestem oczywiście zadowolony z tego wpisu:) Z jednym, małym, malutkim.. ALE.
Post tamten był pisany na początku roku 2018 co znaczy że się troszkę przedawnił – a dokładniej, rozwiązanie którego użyłem na backendzie, tj. gevent – w świetle tego że w bibliotece standardowej zagościło inne rozwiązanie które pozwala zaimplementować pętlę zdarzeń – nie już takie „sexy” ;)
Co takiego pojawiło się w bibliotece standardowej?
Po prawdzie to, asyncio, bo tak nazywa się biblioteka o której jest dzisiejszy wpis, jest w stdlibie już od Pythona 3.4, to relatywnie od niedawna nadaje się do użytku.
Asyncio to biblioteka pozwalająca na asynchroniczne programowanie w ramach pętli zdarzeń. Owa pętla zdarzeń, to w pewnym uproszczeniu, pętla która pozwala na „równoległe” wykonywanie się zadań. Schemat działania wygląda tak:
W nomenklaturze asyncio, Request który jest na schemacie, nazywany jest korutyną (ang. coroutine), natomiast w gevencie – greenletem. Warto tu dodać że istnieje tłumaczenie słowa coroutine i brzmi ono „współprogram”, z tym że nie spotkałem się z jego powszechnym użyciem, co jest powodem tego że w tym wpisie będzie spolszczone słowo – korutyna. Całość koncepcji pętli zdarzeń (ang. event loop) polega na tym że kolejne korutyny wchodzą do pętli i są wykonywane, jednakże każde żądanie I/O (input / output) np. zapytanie do bazy, zapis pliku, komunikacji via http z zewnętrznym serwerem, powoduje że korutyna taka nie blokuje całej pętli, dając możliwość wykonania się innej korutynie, jeśli wróci odpowiedź z żądania I/O, korutyna powróci do pętli i zacznie wykonywać się od momentu w którym skończyła.
Asyncio wprowadza do definicji korutyn słowa kluczowe async i await – dokładniej jest to tzw. „sugar syntax” – element składni języka który służy tylko wygodzie programisty i przykrywa, w tym przypadku, odpowiednie dekoratory. To pierwsze definiuje nam że funkcja jest korutyną a await sprawia że korutyna jest przerywana i inna przejmuje wątek. Bo cała pętla zdarzeń to jeden wątek. Przykład korutyny:
1 2 3 4 5 6 7 8 9 10 | import asyncio async def main(): ... print('hello') ... await asyncio.sleep(1) ... print('world') asyncio.run(main()) hello world |
Po więcej informacji a także kilka fajnych wykresów odsyłam na tę stronę.
Wracając do naszego przykładu..
Zgodnie z tym co było w moim artykule, które zlinkowałem na początku wpisu, zaimplementujemy dziś serwer WebSocket’owy który pozwoli na zapis plików na serwer oraz sprawdzenie ich programem ClamAV. Przypominam nasz protokół:
Pisząc implementacje w asyncio zachowany ten protokół, dzięki czemu kod JSowy na froncie (pokazany w artykule z geventem) nie zmieni się :) taki oto plus konsekwentnego programowania.
Czas na kod
Nim zaczniemy sobie pisać kod, muszę dodać że wpis z geventem był pod Python’a 2.7 – dziś piszemy pod Pythona 3.7, więc poza tym że będą słowa kluczowe async/await to spotkacie się również z definicją typów, zamianą słowa kluczowego print na funkcję oraz z tym że Python 3.7 ma osobny typ – bytes.
Czekaj, stop!
Podoba Ci się to co tworzę? Jeśli tak to zapraszam Cię do zapisania się na newsletter:Jeśli to Cię interesuje to zapraszam również na swoje social media.
Jak i do ewentualnego postawienia mi kawy :)
Aby móc pisać kod należy dociągnąć dwie zależności:
pip install aiohttp aiofiles
Pierwsza do obsługi protokołu WebSocket druga do zapisu pliku na dysk. Widać tu wadę asyncio – jeśli chcemy użyć biblioteki, konieczne jest by była await’owana. Nie zawsze idzie znaleźć odpowiednio stabilne rozwiązanie. W gevencie działało to nieco inaczej – on patchował sobie bibliotekę standardową a dokładnie sockety, pliki itp. – dzięki czemu prawie każda biblioteka była od razu „gevent-friendly”.
No ale już finalnie przechodząc do kodu, stwórzmy klasę która będzie realizować serwer WS:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | class WSServer(WebSocketFileReceiver): async def websocket_handler(self, request) -> web.WebSocketResponse: self.ws = web.WebSocketResponse() await self.ws.prepare(request) async for msg in self.ws: if msg.type == WSMsgType.TEXT: await self.handle_msg(msg) elif msg.type == WSMsgType.ERROR: print('ws connection closed with exception %s' % self.ws.exception()) print('websocket connection closed') return self.ws async def handle_msg(self, msg) -> None: if msg.data == 'close': await self.ws.close() else: try: msg = loads(str(msg.data)) type = msg['type'] except ValueError: type = 'RAW_DATA' print('Received type {}'.format(type)) if type == 'DONE': await self.finish() print("Upload finished. Delegating to check...") create_task(AntiVirusChecker(self.filename, self.notify_about_result).run()) elif type == 'START': self.init() data = { "type": "DATA", "message": "Upload initialized. Wait for data", "code": 200 } self.save_filename(msg['filename']) await self.ws.send_str(dumps(data)) elif not self.ws.closed and type == 'RAW_DATA': collected_length = self.collect_data(msg.data) data = { "type": "DATA", "code": 200, "bytesRead": collected_length } await self.ws.send_str(dumps(data)) async def notify_about_result(self, result: bytes) -> None: print ("File checked, sending response") txt_result = result.decode("utf-8") status = txt_result.split('\n')[0] await self.ws.send_str(dumps({ 'filename': self.filename, 'status': status, 'type': 'STATUS' })) def __call__(self) -> None: app = web.Application() app.add_routes([web.get('/', self.websocket_handler)]) web.run_app(app) |
Schemat kolekcjonowania danych jest taki sam jak w poprzednim wpisie, więc ominę jego omówienie a skupię się na async/await. Ogólnie rozstawienie awaitów ma znaczenie, to gdzie program przerwie działanie może mieć znaczenie dla architektury jak i wydajności. Zauważ proszę że odebrania pakietu danych prawie od razu jest skazane na await:
1 | await self.handle_msg(msg) |
powoduje to oddanie sterowania do pętli zdarzeń i obsługę innej korutyny – np. innego połączenia, ale też ewentualnie tej samej, jeśli nie ma innych. Kolejny await jest zależny od typu pakietu. Np. dla typu „RAW_DATA”, dane są zapisywane w pamięci i zwracana jest do klienta informacja o tym ile tych danych zostało zapisane – znów z await:
1 2 3 4 5 6 | data = { "type": "DATA", "code": 200, "bytesRead": collected_length } await self.ws.send_str(dumps(data)) |
Pewnie wnikliwy czytelnik zauważył że klasa ta dziedziczy po innej, tutaj jej ciało:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | class WebSocketFileReceiver: def init(self): self.data = [] def save_filename(self, name: str) -> None: self.filename = name def collect_data(self, data: bytes) -> int: raw_data = b64decode(data) self.data.append(raw_data) return len(raw_data) async def finish(self) -> None: async with aiof.open('./' + self.filename, "wb") as out: await out.write(b''.join(self.data)) await out.flush() del self.data |
Nie różni się to dużo od klasy która była w poprzednim artykule, poza formą zapisu do pliku już zebranych danych – tu używamy aiofiles no i mamy typ bytes.
I na sam koniec sprawdzanie plików programem antywirusowym:
1 2 3 4 5 6 7 8 9 10 11 12 13 | class AntiVirusChecker: def __init__(self, filename: str, callback): self.filename = filename self.callback = callback async def run(self) -> None: proc = await create_subprocess_shell( 'clamscan ./' + self.filename, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = await proc.communicate() await self.callback(stdout) |
Do pełni szczęścia brakuje tylko importów
1 2 3 4 5 6 | from json import dumps, loads from base64 import b64decode from aiohttp import web, WSMsgType from asyncio.tasks import create_task from asyncio import create_subprocess_shell, subprocess import aiofiles as aiof |
Co dalej z asyncio?
Asyncio to rozwiązanie które nie jest odpowiednie dla każdego problemu. Zauważ proszę że nie sprawdzi się to w momencie kiedy aplikacja będzie robić skomplikowane obliczenia, natomiast super sprawdzi się jako proxy. W sumie program który napisałem to takie proxy miedzy użytkownikiem który wgrywa plik a komputerem docelowym – przerzuca dane – i do tego asyncio, ogólnie cała pętla zdarzeć, bo to technologia która jest obecna nie tylko w Pythonie, sprawdzi się znakomicie!
Mateusz Mazurek