Przesyłanie plików przez WebSocket i pętla zdarzeń jako serwer – gevent
Hello!
Tym razem zacznę nieco inaczej – bo tym razem nie zacznę od „ale dawno tu nie pisałem” ;) Dziś kolejna pokazówka możliwości biblioteki gevent.
Napiszemy program który wysyła plik na serwer poprzez websocket, serwer sprawdza czy plik nie ma wirusów i odsyła status. Czemu websocket a nie HTTP? Bo na HTTP przykład znajdziecie wszędzie :P
Może najpierw przyjrzyjmy się temu jak wysyłany jest plik na serwer w protokole HTTP. Protokół ten używa wygenerowanego przez siebie ciągu znaków który jest granicą (boundary) pomiędzy polami formularza. Poniżej zrzut z uploadu via HTTP:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | POST /upload?upload_progress_id=12344 HTTP/1.1 Host: localhost:3000 Content-Length: 1325 Origin: http://localhost:3000 ... other headers ... Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryePkpFF7tjBAqx29L ------WebKitFormBoundaryePkpFF7tjBAqx29L Content-Disposition: form-data; name="MAX_FILE_SIZE" 100000 ------WebKitFormBoundaryePkpFF7tjBAqx29L Content-Disposition: form-data; name="uploadedfile"; filename="hello.o" Content-Type: application/x-object ... contents of file goes here ... ------WebKitFormBoundaryePkpFF7tjBAqx29L-- |
W nagłówku Content-Type jest informacja o tym że wysyłamy dane jak multipart/form-data czyli właśnie oddzielone tą granicą o której wcześniej wspomniałem. Jej wartość jest również doklejona do Content-Type i równa „—-WebKitFormBoundaryePkpFF7tjBAqx29L” dla tego przykładu.
Następnie lecą już dane oddzielone własnie tą granicą. Z tego powodu serwer HTTP wie kiedy przestać czytać bajty.
W websockecie nie mamy zaimplementowanego jako takiego przesyłania plików – musimy sobie to sami zaimplementować. Dodatkowo warto wspomnieć iż mimo że Websocket do zestawienia połączenia z serwerem używa protokołu HTTP to sam w sobie jest protokołem połączeniowym i dwukierunkowym (bidirectional). Co znaczy że, w odróżnieniu od HTTP, klient może wysyłać dane ciągle w czasie i nie otrzymywać odpowiedzi żadnej. Może też ciągle otrzymywać jakieś powiadomienia z serwera i nie koniecznie na nie odpowiadać – połączenie trwa do momentu opuszczenia strony lub jawnego jego zerwania. A to znaczy że musimy jakoś serwer powiadomić że teraz zaczynamy wysyłać plik, wysyłać go i jeszcze powiadomić serwer że transfer się zakończył.
Protokół jaki zaimplementujemy:
Zacznijmy więc od najprostszej rzeczy, tj kawałka HTMLa:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | <html> <head> <style> .progress{ width: 40%; } </style> <script src="websocker_sender.js"></script> <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css"> <link href="//netdna.bootstrapcdn.com/bootstrap/3.0.0/css/bootstrap-glyphicons.css" rel="stylesheet"> </head> <body> <form action="" type="multipart/form-data"> <input type="file" multiple="multiple" /> <input type="submit" value="Uploader" /> </form> <div id="progresses"> </div> </body></html> |
Jak widać dodaliśmy boostrapa, jQuery i jeden plik który teraz zaimplementujemy, zaczynając od obiektu:
1 2 3 4 5 6 7 8 9 10 11 12 13 | var WebSockerSender = function(options){ this.config = options; if (!('blockSize' in options)) this.config.blockSize = 1024 //defaults this.currentByte = 0; this.filename = this.config.file.name; this.size = this.config.file.size; this.onSuccess = function(){} this.onError = function(){} this.onProgress = function (){} }; |
Property blockSize to długość jednej paczki przesyłanych bajtów, currentByte to bajt od którego mamy zacząć wysyłać aktualną paczkę, więc na początek będziemy wysyłać od currentByte (0) do currentByte + blockSize (1024). Teraz metoda rozpoczynająca upload:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | this.startUpload = function () { this.ws = new WebSocket(this.config.url); this.ws.senderObject = this; this.ws.onmessage = this.onMessage; var that = this this.ws.onopen = function (params) { this.senderObject.ws.send(JSON.stringify({ 'filename': this.senderObject.filename, 'size': this.senderObject.size, 'type': 'START' })); } }; |
Otwieramy tu połączenie websocketowe i jeśli się ono zestawi (handshake się zakończy) to wysyłamy info (zgodnie z wyżej przestawionym diagramem) że inicjujemy upload. Teraz serwer odpowie nam że jest gotów, wiec zaimplementujmy to w funkcji onMessage:
1 2 3 4 5 6 7 8 9 10 11 12 | this.onMessage = function (params) { var data = JSON.parse(params.data); if (data.type == 'DATA' && data.code == 200){ this.senderObject.sendData(); this.senderObject.currentByte += this.senderObject.config.blockSize; this.senderObject.onProgress(this.senderObject.filename, this.senderObject.currentByte, this.senderObject.size); } else if (data.type == 'STATUS') { this.senderObject.onSuccess(this.senderObject.filename, data.status); this.senderObject.ws.close(); } }; |
Jeśli otrzymaliśmy od serwera type = 'DATA’ to wysyłamy bajty funkcją którą za chwilkę zaimplementujemy, potem przesuwamy wskaźnik o którym pisałem wcześniej i uruchamiany callback „onProgress”.
No i jeszcze funkcja sendData:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | this.sendData = function(){ if (this.currentByte + this.config.blockSize > this.size){ this.config.blockSize = this.size - this.currentByte; } if (this.currentByte == this.size) { this.ws.send(JSON.stringify({ 'type': 'DONE' })) return; } var blob = this.config.file.slice(this.currentByte, this.currentByte + this.config.blockSize); this.reader = new FileReader(); var self = this; this.reader.onabort = function() { console.log('reader: abort') }; this.reader.onerror = function(event) { switch(event.target.error.code) { case event.target.error.NOT_FOUND_ERR: console.log('File not found'); break; case event.target.error.NOT_READABLE_ERR: console.log('File is not readable'); break; case event.target.error.ABORT_ERR: console.log('File upload aborted'); break; default: console.log('An error occurred reading the file.'); }; }; this.reader.onloadend = function(event) { self.ws.send(window.btoa(event.target.result)); }; this.reader.readAsBinaryString(blob); }; |
Sprawdzamy pierw czy jeśli wyślemy paczkę bajtów o standardowej długości to przekroczymy wielkość pliku i jeśli tak to ustawiamy długość ostatniej paczki na tyle ile bajtów zostało do przesłania, dalej sprawdzamy czy wskaźnik jest równy wielkości pliku – jeśli tak to upload się zakończył – informujemy o tym serwer i przestajemy wysyłać dane, nie zamykając połączenia z websocketem – będzie jeszcze info o tym czy plik zawiera wirusy. I na koniec dodajemy możliwość ustawienia callbacków:
1 2 3 4 5 6 7 8 9 10 11 | WebSockerSender.prototype.setSuccessCallback = function (func){ this.onSuccess = func; }; WebSockerSender.prototype.setErrorCallback = function (func){ this.onError = func; }; WebSockerSender.prototype.setOnProgressCallback = function (func){ this.onProgress = func; }; |
Dodamy teraz użycie tej klasy do pliku HTML:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | <script> var createUploadProgressBar = function(filename){ var tpl = '<div class="progress"> <div class="progress-bar" id="'+filename+'" role="progressbar" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width:0%"><span>0%</span><span id="'+filename+'_status"></span></div></div>'; $("#progresses").append(tpl); }; var updateProgressBar = function(filename, current, size){ var value = parseInt(( current / size ) * 100) + '%'; var progressBar = document.getElementById(btoa(filename)); progressBar.style.width = value; progressBar.children[0].innerHTML = value; }; var showStatus = function (filename, status){ var statusIcon = document.getElementById(btoa(filename)+'_status'); statusIcon.className = status == ('./' + filename + ': OK') ? 'glyphicon glyphicon-ok' : 'glyphicon glyphicon-remove'; }; $(document).ready(function() { var socketServerUrl = 'ws://localhost:8000/' var files = []; $('input[type=file]').change(function(event) { files = event.target.files; }); $('form').submit(function() { // Checks user has selected one or more files if( files.length == 0 ) { alert('select files first !'); return false; } for(var i=0; i < files.length; i++) { createUploadProgressBar(btoa(files[i].name)); var transfer = new WebSockerSender({ file: files[i], url: socketServerUrl }); transfer.onProgress = updateProgressBar; transfer.onSuccess = showStatus; transfer.startUpload(); } return false; }); }); </script> |
Zauważ że tutaj się sporo zadziało – dla każdego uploadowanego pliku tworzę pasek postępu – jest to div z ID równym nazwie pliku w base64 (aby uniknąć problemów z kropka pomiędzy rozszerzeniem a właściwą nazwą pliku) oraz elemennt tworzony również z nazwą pliku w base64 ale z doklejonym sufixem „_status”, gdzie będziemy wrzucać status pliku, tj info o tym czy jest zainfekowany czy nie. Wpiąłem się też odpowiednimi funkcjami które będą manipulować wartości tych divów do callbacków naszej klasy.
No i część kliencką skończyliśmy – teraz czas na Pythona, który wepnie się we wcześniej przestawiony flow – aby móc odbierać wiele plików jednocześnie użyjemy biblioteki gevent która jest pętlą zdarzeń – więcej o niej można przeczytać w moim poprzednim wpisie – klik
Gevent pozwala na uruchomienie każdego połączenia do websocketu w nowym greenlecie. Zaczniemy od szkieletu obsługi WebSocketu:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | import gevent from gevent.monkey import patch_all patch_all() def setup(): from geventwebsocket import WebSocketServer, WebSocketApplication, Resource class WSUploader(WebSocketApplication): def on_open(self): print "Connection opened" def on_message(self, message): print "Message" def on_close(self, reason): print "Connection done" WebSocketServer( ('', 8000), Resource({'/': WSUploader}) ).serve_forever() setup() |
Wydzielimy część odpowiedzialną za same zbieranie danych które przyszły do serwera:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | class WebSocketFileReceiver: def init(self): self.data = [] def save_filename(self, name): self.filename = name def collect_data(self, data): raw_data = b64decode(data) self.data.append(raw_data) return len(raw_data) def finish(self): with open('./' + self.filename, 'w') as file: file.write(''.join(self.data)) del self.data |
I niech nasza klasa WSUploader dziedziczy po owe klasie. I jeszcze klasa sprawdzająca plik pod kątem wirusów:
1 2 3 4 5 6 7 8 9 10 | class AntyVirusChecker(gevent.Greenlet): def __init__(self, filename, callback): self.filename = filename self.link(callback) gevent.Greenlet.__init__(self) def _run(self): sub = Popen(['clamscan ./' + self.filename], stdout=PIPE, shell=True) out, err = sub.communicate() return out |
która będzie również oddzielnym greenletem – tutaj jest ciekawa rzecz zrobiona, ponieważ dziedzicząc po klasie gevent.Greenlet możemy stworzyć własną klasę greenletu – wygodne rozwiązanie to jest – punktem wejścia jest przedefiniowanie funkcji _run(). W konstruktorze użyłem funkcji 'link’ która wykona się jak greenlet zakończy swoje działanie. W run() uruchamiamy program clamscan który sprawdzi nam plik przekazany w parametrze pod kątem wirusów.
Teraz zdefiniujmy właściwą funkcję obsługującą połączenia z klientem:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | class WSUploader(WebSocketApplication, WebSocketFileReceiver): def on_open(self): print "Connection opened" def notify_about_result(self, result): print "File checked, sending response" txt_result = result.get() status = txt_result.split('\n')[0] self.ws.send(dumps({ 'filename': self.filename, 'status': status, 'type': 'STATUS' })) def on_message(self, message): try: msg = loads(str(message)) type = msg['type'] except ValueError: type = 'RAW_DATA' if type == 'DONE': self.finish() print "Upload finished. Delegating to check..." AntyVirusChecker(self.filename, self.notify_about_result).start() elif type == 'START': self.init() data = { "type": "DATA", "message": "Upload initialized. Wait for data", "code": 200 } self.save_filename(msg['filename']) self.ws.send(dumps(data)) elif not self.ws.closed and type == 'RAW_DATA': collected_length = self.collect_data(message) data = { "type": "DATA", "code": 200, "bytesRead": collected_length } self.ws.send(dumps(data)) def on_close(self, reason): print "Connection done" |
W gruncie rzeczy kod jest prosty – w funkcji run() dostajemy wiadomość z frontu klientem – próbujemy go potraktować jako JSON – jeśli się nie udaje to zakładamy że to są dane binarne. Jeśli dostaniemy typ „START” to w sumie robimy tylko tyle że zapisujemy sobie nazwę pliku w pamięci i odsyłamy ustalony wcześniej tekst (json). Jeśli typem jest „DONE” to zapisujemy pobrany plik na dysku i delegujemy sprawdzanie czy jest on zainfekowany do osobnego greenletu – tego który stworzyliśmy wcześniej. W jego konstruktorze przekazujemy wskaźnik do funkcji która jest przekazana do funkcji link() – spowoduje to uruchomienie jej gdy plik zostanie sprawdzony – a ona sama wyśle po prostu info o tym do frontu.
Clamscan zwraca rezultat na standardowe wyjście w formie:
1 2 3 4 5 6 7 8 9 10 11 | 0801E0.jpg: OK ----------- SCAN SUMMARY ----------- Known viruses: 6385286 Engine version: 0.99.2 Scanned directories: 0 Scanned files: 1 Infected files: 0 Data scanned: 0.03 MB Data read: 0.03 MB (ratio 1.00:1) Time: 18.792 sec (0 m 18 s) |
Więc my odsyłamy na front tylko pierwszą linię.
Oczywiście kod nie jest idealny i raczej nie nadaje się na produkcję. Po pierwsze pliki są zapisywany tymczasowo w pamięci – powinny być pchane od razu do pliku. Teraz zauważyłem też że wskaźnik na froncie przesuwam nie o zwrócona ilość bajtów a o wielkość bloku co jest bez sensem – przesuwanie o zwróconą ilość bajtów pozwala na prostą obsługę retransmisji.
Załączam demo w formie gifa:
[WPGP gif_id=”2347″ width=”100%”]
Mateusz Mazurek