Mateusz Mazurek – programista z pasją

Python, architektura, ciekawostki ze świata IT

Algorytmika Inżynieria oprogramowania Programowanie Programowanie webowe

Przesyłanie plików przez WebSocket i pętla zdarzeń jako serwer – gevent

Hello!
Tym razem zacznę nieco inaczej – bo tym razem nie zacznę od „ale dawno tu nie pisałem” ;) Dziś kolejna pokazówka możliwości biblioteki gevent.

Napiszemy program który wysyła plik na serwer poprzez websocket, serwer sprawdza czy plik nie ma wirusów i odsyła status. Czemu websocket a nie HTTP? Bo na HTTP przykład znajdziecie wszędzie :P

Może najpierw przyjrzyjmy się temu jak wysyłany jest plik na serwer w protokole HTTP. Protokół ten używa wygenerowanego przez siebie ciągu znaków który jest granicą (boundary) pomiędzy polami formularza. Poniżej zrzut z uploadu via HTTP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
POST /upload?upload_progress_id=12344 HTTP/1.1
Host: localhost:3000
Content-Length: 1325
Origin: http://localhost:3000
... other headers ...
Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryePkpFF7tjBAqx29L

------WebKitFormBoundaryePkpFF7tjBAqx29L
Content-Disposition: form-data; name="MAX_FILE_SIZE"

100000
------WebKitFormBoundaryePkpFF7tjBAqx29L
Content-Disposition: form-data; name="uploadedfile"; filename="hello.o"
Content-Type: application/x-object

... contents of file goes here ...
------WebKitFormBoundaryePkpFF7tjBAqx29L--

W nagłówku Content-Type jest informacja o tym że wysyłamy dane jak multipart/form-data czyli właśnie oddzielone tą granicą o której wcześniej wspomniałem. Jej wartość jest również doklejona do Content-Type i równa „—-WebKitFormBoundaryePkpFF7tjBAqx29L” dla tego przykładu.

Następnie lecą już dane oddzielone własnie tą granicą. Z tego powodu serwer HTTP wie kiedy przestać czytać bajty.

W websockecie nie mamy zaimplementowanego jako takiego przesyłania plików – musimy sobie to sami zaimplementować. Dodatkowo warto wspomnieć iż mimo że Websocket do zestawienia połączenia z serwerem używa protokołu HTTP to sam w sobie jest protokołem połączeniowym i dwukierunkowym (bidirectional). Co znaczy że, w odróżnieniu od HTTP, klient może wysyłać dane ciągle w czasie i nie otrzymywać odpowiedzi żadnej. Może też ciągle otrzymywać jakieś powiadomienia z serwera i nie koniecznie na nie odpowiadać – połączenie trwa do momentu opuszczenia strony lub jawnego jego zerwania. A to znaczy że musimy jakoś serwer powiadomić że teraz zaczynamy wysyłać plik, wysyłać go i jeszcze powiadomić serwer że transfer się zakończył.

Protokół jaki zaimplementujemy:

Zacznijmy więc od najprostszej rzeczy, tj kawałka HTMLa:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<html>
<head>
<style>
    .progress{
        width: 40%;
    }

</style>
<script src="websocker_sender.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script>
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css">
<link href="//netdna.bootstrapcdn.com/bootstrap/3.0.0/css/bootstrap-glyphicons.css" rel="stylesheet">

</head>
<body>
    <form action="" type="multipart/form-data">
            <input type="file" multiple="multiple" />
            <input type="submit" value="Uploader" />
    </form>

    <div id="progresses">

    </div>

</body></html>

Jak widać dodaliśmy boostrapa, jQuery i jeden plik który teraz zaimplementujemy, zaczynając od obiektu:

1
2
3
4
5
6
7
8
9
10
11
12
13
var WebSockerSender = function(options){
    this.config = options;
    if (!('blockSize' in options))
        this.config.blockSize = 1024 //defaults
   
    this.currentByte = 0;
    this.filename = this.config.file.name;
    this.size = this.config.file.size;

    this.onSuccess = function(){}
    this.onError = function(){}
    this.onProgress = function (){}
};

Property blockSize to długość jednej paczki przesyłanych bajtów, currentByte to bajt od którego mamy zacząć wysyłać aktualną paczkę, więc na początek będziemy wysyłać od currentByte (0) do currentByte + blockSize (1024). Teraz metoda rozpoczynająca upload:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    this.startUpload = function () {
        this.ws = new WebSocket(this.config.url);
        this.ws.senderObject = this;
        this.ws.onmessage = this.onMessage;

        var that = this

        this.ws.onopen = function (params) {
            this.senderObject.ws.send(JSON.stringify({
                'filename': this.senderObject.filename,
                'size': this.senderObject.size,
                'type': 'START'
            }));        
        }

    };

Otwieramy tu połączenie websocketowe i jeśli się ono zestawi (handshake się zakończy) to wysyłamy info (zgodnie z wyżej przestawionym diagramem) że inicjujemy upload. Teraz serwer odpowie nam że jest gotów, wiec zaimplementujmy to w funkcji onMessage:

1
2
3
4
5
6
7
8
9
10
11
12
    this.onMessage = function (params) {
        var data = JSON.parse(params.data);
        if (data.type == 'DATA' && data.code == 200){
            this.senderObject.sendData();
            this.senderObject.currentByte += this.senderObject.config.blockSize;
            this.senderObject.onProgress(this.senderObject.filename,
                    this.senderObject.currentByte, this.senderObject.size);
        } else if (data.type == 'STATUS') {
            this.senderObject.onSuccess(this.senderObject.filename, data.status);
            this.senderObject.ws.close();
        }
    };

Jeśli otrzymaliśmy od serwera type = 'DATA’ to wysyłamy bajty funkcją którą za chwilkę zaimplementujemy, potem przesuwamy wskaźnik o którym pisałem wcześniej i uruchamiany callback „onProgress”.

No i jeszcze funkcja sendData:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  this.sendData = function(){
        if (this.currentByte + this.config.blockSize > this.size){
            this.config.blockSize = this.size - this.currentByte;
        }

        if (this.currentByte == this.size) {
            this.ws.send(JSON.stringify({
                'type': 'DONE'
            }))
            return;
        }
        var blob = this.config.file.slice(this.currentByte, this.currentByte + this.config.blockSize);
        this.reader = new FileReader();

        var self = this;
       
        this.reader.onabort = function() {
            console.log('reader: abort')
        };
       
        this.reader.onerror = function(event) {
            switch(event.target.error.code) {
                case event.target.error.NOT_FOUND_ERR:
                    console.log('File not found');
                    break;
                case event.target.error.NOT_READABLE_ERR:
                    console.log('File is not readable');
                    break;
                case event.target.error.ABORT_ERR:
                    console.log('File upload aborted');
                    break;
                default:
                    console.log('An error occurred reading the file.');
            };
        };
       
        this.reader.onloadend = function(event) {
            self.ws.send(window.btoa(event.target.result));
        };
       
        this.reader.readAsBinaryString(blob);

    };

Sprawdzamy pierw czy jeśli wyślemy paczkę bajtów o standardowej długości to przekroczymy wielkość pliku i jeśli tak to ustawiamy długość ostatniej paczki na tyle ile bajtów zostało do przesłania, dalej sprawdzamy czy wskaźnik jest równy wielkości pliku – jeśli tak to upload się zakończył – informujemy o tym serwer i przestajemy wysyłać dane, nie zamykając połączenia z websocketem – będzie jeszcze info o tym czy plik zawiera wirusy. I na koniec dodajemy możliwość ustawienia callbacków:

1
2
3
4
5
6
7
8
9
10
11
WebSockerSender.prototype.setSuccessCallback = function (func){
    this.onSuccess = func;
};

WebSockerSender.prototype.setErrorCallback = function (func){
    this.onError = func;
};

WebSockerSender.prototype.setOnProgressCallback = function (func){
    this.onProgress = func;
};

Dodamy teraz użycie tej klasy do pliku HTML:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<script>

var createUploadProgressBar = function(filename){
    var tpl = '<div class="progress">  <div class="progress-bar" id="'+filename+'" role="progressbar" aria-valuenow="0"  aria-valuemin="0" aria-valuemax="100" style="width:0%"><span>0%</span><span id="'+filename+'_status"></span></div></div>';
    $("#progresses").append(tpl);
};

var updateProgressBar = function(filename, current, size){
    var value = parseInt(( current / size ) * 100) + '%';
    var progressBar = document.getElementById(btoa(filename));
    progressBar.style.width = value;
    progressBar.children[0].innerHTML = value;
};

var showStatus = function (filename, status){
    var statusIcon = document.getElementById(btoa(filename)+'_status');
    statusIcon.className = status == ('./' + filename + ': OK') ? 'glyphicon glyphicon-ok' : 'glyphicon glyphicon-remove';
};

$(document).ready(function() {
    var socketServerUrl = 'ws://localhost:8000/'
    var files = [];
   
    $('input[type=file]').change(function(event) {
        files = event.target.files;
    });
   
    $('form').submit(function() {
       
        // Checks user has selected one or more files
        if( files.length == 0 ) {
            alert('select files first !');
            return false;
        }
        for(var i=0; i < files.length; i++) {
            createUploadProgressBar(btoa(files[i].name));
            var transfer = new WebSockerSender({
                file: files[i],
                url: socketServerUrl
            });

            transfer.onProgress = updateProgressBar;
            transfer.onSuccess = showStatus;
       
            transfer.startUpload();
           
        }
       
        return false;
    });

});

</script>

Zauważ że tutaj się sporo zadziało – dla każdego uploadowanego pliku tworzę pasek postępu – jest to div z ID równym nazwie pliku w base64 (aby uniknąć problemów z kropka pomiędzy rozszerzeniem a właściwą nazwą pliku) oraz elemennt tworzony również z nazwą pliku w base64 ale z doklejonym sufixem „_status”, gdzie będziemy wrzucać status pliku, tj info o tym czy jest zainfekowany czy nie. Wpiąłem się też odpowiednimi funkcjami które będą manipulować wartości tych divów do callbacków naszej klasy.

No i część kliencką skończyliśmy – teraz czas na Pythona, który wepnie się we wcześniej przestawiony flow – aby móc odbierać wiele plików jednocześnie użyjemy biblioteki gevent która jest pętlą zdarzeń – więcej o niej można przeczytać w moim poprzednim wpisie – klik

Gevent pozwala na uruchomienie każdego połączenia do websocketu w nowym greenlecie. Zaczniemy od szkieletu obsługi WebSocketu:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import gevent
from gevent.monkey import patch_all

patch_all()


def setup():
    from geventwebsocket import WebSocketServer, WebSocketApplication, Resource

    class WSUploader(WebSocketApplication):

        def on_open(self):
            print "Connection opened"

        def on_message(self, message):
            print "Message"

        def on_close(self, reason):
            print "Connection done"

    WebSocketServer(
        ('', 8000),
        Resource({'/': WSUploader})
    ).serve_forever()


setup()

Wydzielimy część odpowiedzialną za same zbieranie danych które przyszły do serwera:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class WebSocketFileReceiver:

    def init(self):
        self.data = []

    def save_filename(self, name):
        self.filename = name

    def collect_data(self, data):
        raw_data = b64decode(data)
        self.data.append(raw_data)
        return len(raw_data)

    def finish(self):
        with open('./' + self.filename, 'w') as file:
            file.write(''.join(self.data))

        del self.data

I niech nasza klasa WSUploader dziedziczy po owe klasie. I jeszcze klasa sprawdzająca plik pod kątem wirusów:

1
2
3
4
5
6
7
8
9
10
class AntyVirusChecker(gevent.Greenlet):
    def __init__(self, filename, callback):
        self.filename = filename
        self.link(callback)
        gevent.Greenlet.__init__(self)

    def _run(self):
        sub = Popen(['clamscan ./' + self.filename], stdout=PIPE, shell=True)
        out, err = sub.communicate()
        return out

która będzie również oddzielnym greenletem – tutaj jest ciekawa rzecz zrobiona, ponieważ dziedzicząc po klasie gevent.Greenlet możemy stworzyć własną klasę greenletu – wygodne rozwiązanie to jest – punktem wejścia jest przedefiniowanie funkcji _run(). W konstruktorze użyłem funkcji 'link’ która wykona się jak greenlet zakończy swoje działanie. W run() uruchamiamy program clamscan który sprawdzi nam plik przekazany w parametrze pod kątem wirusów.

Teraz zdefiniujmy właściwą funkcję obsługującą połączenia z klientem:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    class WSUploader(WebSocketApplication, WebSocketFileReceiver):

        def on_open(self):
            print "Connection opened"

        def notify_about_result(self, result):
            print "File checked, sending response"
            txt_result = result.get()
            status = txt_result.split('\n')[0]
            self.ws.send(dumps({
                'filename': self.filename,
                'status': status,
                'type': 'STATUS'
            }))

        def on_message(self, message):
            try:
                msg = loads(str(message))
                type = msg['type']
            except ValueError:
                type = 'RAW_DATA'

            if type == 'DONE':
                self.finish()
                print "Upload finished. Delegating to check..."
                AntyVirusChecker(self.filename, self.notify_about_result).start()
            elif type == 'START':
                self.init()
                data = {
                    "type": "DATA",
                    "message": "Upload initialized. Wait for data",
                    "code": 200
                }

                self.save_filename(msg['filename'])
                self.ws.send(dumps(data))
            elif not self.ws.closed and type == 'RAW_DATA':
                collected_length = self.collect_data(message)
                data = {
                    "type": "DATA",
                    "code": 200,
                    "bytesRead": collected_length
                }
                self.ws.send(dumps(data))

        def on_close(self, reason):
            print "Connection done"

W gruncie rzeczy kod jest prosty – w funkcji run() dostajemy wiadomość z frontu klientem – próbujemy go potraktować jako JSON – jeśli się nie udaje to zakładamy że to są dane binarne. Jeśli dostaniemy typ „START” to w sumie robimy tylko tyle że zapisujemy sobie nazwę pliku w pamięci i odsyłamy ustalony wcześniej tekst (json). Jeśli typem jest „DONE” to zapisujemy pobrany plik na dysku i delegujemy sprawdzanie czy jest on zainfekowany do osobnego greenletu – tego który stworzyliśmy wcześniej. W jego konstruktorze przekazujemy wskaźnik do funkcji która jest przekazana do funkcji link() – spowoduje to uruchomienie jej gdy plik zostanie sprawdzony – a ona sama wyśle po prostu info o tym do frontu.

Clamscan zwraca rezultat na standardowe wyjście w formie:

1
2
3
4
5
6
7
8
9
10
11
0801E0.jpg: OK

----------- SCAN SUMMARY -----------
Known viruses: 6385286
Engine version: 0.99.2
Scanned directories: 0
Scanned files: 1
Infected files: 0
Data scanned: 0.03 MB
Data read: 0.03 MB (ratio 1.00:1)
Time: 18.792 sec (0 m 18 s)

Więc my odsyłamy na front tylko pierwszą linię.

Oczywiście kod nie jest idealny i raczej nie nadaje się na produkcję. Po pierwsze pliki są zapisywany tymczasowo w pamięci – powinny być pchane od razu do pliku. Teraz zauważyłem też że wskaźnik na froncie przesuwam nie o zwrócona ilość bajtów a o wielkość bloku co jest bez sensem – przesuwanie o zwróconą ilość bajtów pozwala na prostą obsługę retransmisji.

Załączam demo w formie gifa:

[WPGP gif_id=”2347″ width=”100%”]

Dzięki za wizytę,
Mateusz Mazurek

A może wolisz nowości na mail?

Subskrybuj
Powiadom o
guest

Witryna wykorzystuje Akismet, aby ograniczyć spam. Dowiedz się więcej jak przetwarzane są dane komentarzy.

2 komentarzy
Inline Feedbacks
View all comments