Przesyłanie plików przez WebSocket i pętla zdarzeń jako serwer – gevent

Hello!
Tym razem zacznę nieco inaczej – bo tym razem nie zacznę od „ale dawno tu nie pisałem” ;) Dziś kolejna pokazówka możliwości biblioteki gevent.

Napiszemy program który wysyła plik na serwer poprzez websocket, serwer sprawdza czy plik nie ma wirusów i odsyła status. Czemu websocket a nie HTTP? Bo na HTTP przykład znajdziecie wszędzie :P

Może najpierw przyjrzyjmy się temu jak wysyłany jest plik na serwer w protokole HTTP. Protokół ten używa wygenerowanego przez siebie ciągu znaków który jest granicą (boundary) pomiędzy polami formularza. Poniżej zrzut z uploadu via HTTP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
POST /upload?upload_progress_id=12344 HTTP/1.1
Host: localhost:3000
Content-Length: 1325
Origin: http://localhost:3000
... other headers ...
Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryePkpFF7tjBAqx29L

------WebKitFormBoundaryePkpFF7tjBAqx29L
Content-Disposition: form-data; name="MAX_FILE_SIZE"

100000
------WebKitFormBoundaryePkpFF7tjBAqx29L
Content-Disposition: form-data; name="uploadedfile"; filename="hello.o"
Content-Type: application/x-object

... contents of file goes here ...
------WebKitFormBoundaryePkpFF7tjBAqx29L--

W nagłówku Content-Type jest informacja o tym że wysyłamy dane jak multipart/form-data czyli właśnie oddzielone tą granicą o której wcześniej wspomniałem. Jej wartość jest również doklejona do Content-Type i równa „—-WebKitFormBoundaryePkpFF7tjBAqx29L” dla tego przykładu.

Następnie lecą już dane oddzielone własnie tą granicą. Z tego powodu serwer HTTP wie kiedy przestać czytać bajty.

W websockecie nie mamy zaimplementowanego jako takiego przesyłania plików – musimy sobie to sami zaimplementować. Dodatkowo warto wspomnieć iż mimo że Websocket do zestawienia połączenia z serwerem używa protokołu HTTP to sam w sobie jest protokołem połączeniowym i dwukierunkowym (bidirectional). Co znaczy że, w odróżnieniu od HTTP, klient może wysyłać dane ciągle w czasie i nie otrzymywać odpowiedzi żadnej. Może też ciągle otrzymywać jakieś powiadomienia z serwera i nie koniecznie na nie odpowiadać – połączenie trwa do momentu opuszczenia strony lub jawnego jego zerwania. A to znaczy że musimy jakoś serwer powiadomić że teraz zaczynamy wysyłać plik, wysyłać go i jeszcze powiadomić serwer że transfer się zakończył.

Protokół jaki zaimplementujemy:

Zacznijmy więc od najprostszej rzeczy, tj kawałka HTMLa:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<html>
<head>
<style>
  .progress{
      width: 40%;
  }

</style>
<script src="websocker_sender.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script>
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css">
<link href="//netdna.bootstrapcdn.com/bootstrap/3.0.0/css/bootstrap-glyphicons.css" rel="stylesheet">

</head>
<body>
  <form action="" type="multipart/form-data">
          <input type="file" multiple="multiple" />
          <input type="submit" value="Uploader" />
  </form>

  <div id="progresses">

  </div>

</body></html>

Jak widać dodaliśmy boostrapa, jQuery i jeden plik który teraz zaimplementujemy, zaczynając od obiektu:

1
2
3
4
5
6
7
8
9
10
11
12
13
var WebSockerSender = function(options){
  this.config = options;
  if (!('blockSize' in options))
      this.config.blockSize = 1024 //defaults
 
  this.currentByte = 0;
  this.filename = this.config.file.name;
  this.size = this.config.file.size;

  this.onSuccess = function(){}
  this.onError = function(){}
  this.onProgress = function (){}
};

Property blockSize to długość jednej paczki przesyłanych bajtów, currentByte to bajt od którego mamy zacząć wysyłać aktualną paczkę, więc na początek będziemy wysyłać od currentByte (0) do currentByte + blockSize (1024). Teraz metoda rozpoczynająca upload:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  this.startUpload = function () {
      this.ws = new WebSocket(this.config.url);
      this.ws.senderObject = this;
      this.ws.onmessage = this.onMessage;

      var that = this

      this.ws.onopen = function (params) {
          this.senderObject.ws.send(JSON.stringify({
              'filename': this.senderObject.filename,
              'size': this.senderObject.size,
              'type': 'START'
          }));      
      }

  };

Otwieramy tu połączenie websocketowe i jeśli się ono zestawi (handshake się zakończy) to wysyłamy info (zgodnie z wyżej przestawionym diagramem) że inicjujemy upload. Teraz serwer odpowie nam że jest gotów, wiec zaimplementujmy to w funkcji onMessage:

1
2
3
4
5
6
7
8
9
10
11
12
  this.onMessage = function (params) {
      var data = JSON.parse(params.data);
      if (data.type == 'DATA' && data.code == 200){
          this.senderObject.sendData();
          this.senderObject.currentByte += this.senderObject.config.blockSize;
          this.senderObject.onProgress(this.senderObject.filename,
                  this.senderObject.currentByte, this.senderObject.size);
      } else if (data.type == 'STATUS') {
          this.senderObject.onSuccess(this.senderObject.filename, data.status);
          this.senderObject.ws.close();
      }
  };

Jeśli otrzymaliśmy od serwera type = 'DATA’ to wysyłamy bajty funkcją którą za chwilkę zaimplementujemy, potem przesuwamy wskaźnik o którym pisałem wcześniej i uruchamiany callback „onProgress”.

No i jeszcze funkcja sendData:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
this.sendData = function(){
      if (this.currentByte + this.config.blockSize > this.size){
          this.config.blockSize = this.size - this.currentByte;
      }

      if (this.currentByte == this.size) {
          this.ws.send(JSON.stringify({
              'type': 'DONE'
          }))
          return;
      }
      var blob = this.config.file.slice(this.currentByte, this.currentByte + this.config.blockSize);
      this.reader = new FileReader();

      var self = this;
     
      this.reader.onabort = function() {
          console.log('reader: abort')
      };
     
      this.reader.onerror = function(event) {
          switch(event.target.error.code) {
              case event.target.error.NOT_FOUND_ERR:
                  console.log('File not found');
                  break;
              case event.target.error.NOT_READABLE_ERR:
                  console.log('File is not readable');
                  break;
              case event.target.error.ABORT_ERR:
                  console.log('File upload aborted');
                  break;
              default:
                  console.log('An error occurred reading the file.');
          };
      };
     
      this.reader.onloadend = function(event) {
          self.ws.send(window.btoa(event.target.result));
      };
     
      this.reader.readAsBinaryString(blob);

  };

Sprawdzamy pierw czy jeśli wyślemy paczkę bajtów o standardowej długości to przekroczymy wielkość pliku i jeśli tak to ustawiamy długość ostatniej paczki na tyle ile bajtów zostało do przesłania, dalej sprawdzamy czy wskaźnik jest równy wielkości pliku – jeśli tak to upload się zakończył – informujemy o tym serwer i przestajemy wysyłać dane, nie zamykając połączenia z websocketem – będzie jeszcze info o tym czy plik zawiera wirusy. I na koniec dodajemy możliwość ustawienia callbacków:

1
2
3
4
5
6
7
8
9
10
11
WebSockerSender.prototype.setSuccessCallback = function (func){
  this.onSuccess = func;
};

WebSockerSender.prototype.setErrorCallback = function (func){
  this.onError = func;
};

WebSockerSender.prototype.setOnProgressCallback = function (func){
  this.onProgress = func;
};

Dodamy teraz użycie tej klasy do pliku HTML:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<script>

var createUploadProgressBar = function(filename){
  var tpl = '<div class="progress">  <div class="progress-bar" id="'+filename+'" role="progressbar" aria-valuenow="0"  aria-valuemin="0" aria-valuemax="100" style="width:0%"><span>0%</span><span id="'+filename+'_status"></span></div></div>';
  $("#progresses").append(tpl);
};

var updateProgressBar = function(filename, current, size){
  var value = parseInt(( current / size ) * 100) + '%';
  var progressBar = document.getElementById(btoa(filename));
  progressBar.style.width = value;
  progressBar.children[0].innerHTML = value;
};

var showStatus = function (filename, status){
  var statusIcon = document.getElementById(btoa(filename)+'_status');
  statusIcon.className = status == ('./' + filename + ': OK') ? 'glyphicon glyphicon-ok' : 'glyphicon glyphicon-remove';
};

$(document).ready(function() {
  var socketServerUrl = 'ws://localhost:8000/'
  var files = [];
 
  $('input[type=file]').change(function(event) {
      files = event.target.files;
  });
 
  $('form').submit(function() {
     
      // Checks user has selected one or more files
      if( files.length == 0 ) {
          alert('select files first !');
          return false;
      }
      for(var i=0; i < files.length; i++) {
          createUploadProgressBar(btoa(files[i].name));
          var transfer = new WebSockerSender({
              file: files[i],
              url: socketServerUrl
          });

          transfer.onProgress = updateProgressBar;
          transfer.onSuccess = showStatus;
     
          transfer.startUpload();
         
      }
     
      return false;
  });

});

</script>

Zauważ że tutaj się sporo zadziało – dla każdego uploadowanego pliku tworzę pasek postępu – jest to div z ID równym nazwie pliku w base64 (aby uniknąć problemów z kropka pomiędzy rozszerzeniem a właściwą nazwą pliku) oraz elemennt tworzony również z nazwą pliku w base64 ale z doklejonym sufixem „_status”, gdzie będziemy wrzucać status pliku, tj info o tym czy jest zainfekowany czy nie. Wpiąłem się też odpowiednimi funkcjami które będą manipulować wartości tych divów do callbacków naszej klasy.

No i część kliencką skończyliśmy – teraz czas na Pythona, który wepnie się we wcześniej przestawiony flow – aby móc odbierać wiele plików jednocześnie użyjemy biblioteki gevent która jest pętlą zdarzeń – więcej o niej można przeczytać w moim poprzednim wpisie – klik

Gevent pozwala na uruchomienie każdego połączenia do websocketu w nowym greenlecie. Zaczniemy od szkieletu obsługi WebSocketu:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import gevent
from gevent.monkey import patch_all

patch_all()


def setup():
  from geventwebsocket import WebSocketServer, WebSocketApplication, Resource

  class WSUploader(WebSocketApplication):

      def on_open(self):
          print "Connection opened"

      def on_message(self, message):
          print "Message"

      def on_close(self, reason):
          print "Connection done"

  WebSocketServer(
      ('', 8000),
      Resource({'/': WSUploader})
  ).serve_forever()


setup()

Wydzielimy część odpowiedzialną za same zbieranie danych które przyszły do serwera:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class WebSocketFileReceiver:

  def init(self):
      self.data = []

  def save_filename(self, name):
      self.filename = name

  def collect_data(self, data):
      raw_data = b64decode(data)
      self.data.append(raw_data)
      return len(raw_data)

  def finish(self):
      with open('./' + self.filename, 'w') as file:
          file.write(''.join(self.data))

      del self.data

I niech nasza klasa WSUploader dziedziczy po owe klasie. I jeszcze klasa sprawdzająca plik pod kątem wirusów:

1
2
3
4
5
6
7
8
9
10
class AntyVirusChecker(gevent.Greenlet):
  def __init__(self, filename, callback):
      self.filename = filename
      self.link(callback)
      gevent.Greenlet.__init__(self)

  def _run(self):
      sub = Popen(['clamscan ./' + self.filename], stdout=PIPE, shell=True)
      out, err = sub.communicate()
      return out

która będzie również oddzielnym greenletem – tutaj jest ciekawa rzecz zrobiona, ponieważ dziedzicząc po klasie gevent.Greenlet możemy stworzyć własną klasę greenletu – wygodne rozwiązanie to jest – punktem wejścia jest przedefiniowanie funkcji _run(). W konstruktorze użyłem funkcji 'link’ która wykona się jak greenlet zakończy swoje działanie. W run() uruchamiamy program clamscan który sprawdzi nam plik przekazany w parametrze pod kątem wirusów.

Teraz zdefiniujmy właściwą funkcję obsługującą połączenia z klientem:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
  class WSUploader(WebSocketApplication, WebSocketFileReceiver):

      def on_open(self):
          print "Connection opened"

      def notify_about_result(self, result):
          print "File checked, sending response"
          txt_result = result.get()
          status = txt_result.split('\n')[0]
          self.ws.send(dumps({
              'filename': self.filename,
              'status': status,
              'type': 'STATUS'
          }))

      def on_message(self, message):
          try:
              msg = loads(str(message))
              type = msg['type']
          except ValueError:
              type = 'RAW_DATA'

          if type == 'DONE':
              self.finish()
              print "Upload finished. Delegating to check..."
              AntyVirusChecker(self.filename, self.notify_about_result).start()
          elif type == 'START':
              self.init()
              data = {
                  "type": "DATA",
                  "message": "Upload initialized. Wait for data",
                  "code": 200
              }

              self.save_filename(msg['filename'])
              self.ws.send(dumps(data))
          elif not self.ws.closed and type == 'RAW_DATA':
              collected_length = self.collect_data(message)
              data = {
                  "type": "DATA",
                  "code": 200,
                  "bytesRead": collected_length
              }
              self.ws.send(dumps(data))

      def on_close(self, reason):
          print "Connection done"

W gruncie rzeczy kod jest prosty – w funkcji run() dostajemy wiadomość z frontu klientem – próbujemy go potraktować jako JSON – jeśli się nie udaje to zakładamy że to są dane binarne. Jeśli dostaniemy typ „START” to w sumie robimy tylko tyle że zapisujemy sobie nazwę pliku w pamięci i odsyłamy ustalony wcześniej tekst (json). Jeśli typem jest „DONE” to zapisujemy pobrany plik na dysku i delegujemy sprawdzanie czy jest on zainfekowany do osobnego greenletu – tego który stworzyliśmy wcześniej. W jego konstruktorze przekazujemy wskaźnik do funkcji która jest przekazana do funkcji link() – spowoduje to uruchomienie jej gdy plik zostanie sprawdzony – a ona sama wyśle po prostu info o tym do frontu.

Clamscan zwraca rezultat na standardowe wyjście w formie:

1
2
3
4
5
6
7
8
9
10
11
0801E0.jpg: OK

----------- SCAN SUMMARY -----------
Known viruses: 6385286
Engine version: 0.99.2
Scanned directories: 0
Scanned files: 1
Infected files: 0
Data scanned: 0.03 MB
Data read: 0.03 MB (ratio 1.00:1)
Time: 18.792 sec (0 m 18 s)

Więc my odsyłamy na front tylko pierwszą linię.

Oczywiście kod nie jest idealny i raczej nie nadaje się na produkcję. Po pierwsze pliki są zapisywany tymczasowo w pamięci – powinny być pchane od razu do pliku. Teraz zauważyłem też że wskaźnik na froncie przesuwam nie o zwrócona ilość bajtów a o wielkość bloku co jest bez sensem – przesuwanie o zwróconą ilość bajtów pozwala na prostą obsługę retransmisji.

Załączam demo w formie gifa:

[WPGP gif_id=”2347″ width=”100%”]

Dzięki za wizytę,
Mateusz Mazurek
Mateusz M.

Ostatnie wpisy

Podsumowanie: luty i marzec 2024

Ostatnio tygodnie były tak bardzo wypełnione, że nie udało mi się napisać nawet krótkiego podsumowanie. Więc dziś zbiorczo podsumuję luty… Read More

1 tydzień ago

Podsumowanie: styczeń 2024

Zapraszam na krótkie podsumowanie miesiąca. Książki W styczniu przeczytałem "Homo Deus: Historia jutra". Książka łudząco podoba do wcześniejszej książki tego… Read More

2 miesiące ago

Podsumowanie roku 2023

Cześć! Zapraszam na podsumowanie roku 2023. Książki Zacznijmy od książek. W tym roku cel 35 książek nie został osiągnięty. Niemniej… Read More

3 miesiące ago

Podsumowanie: grudzień 2023

Zapraszam na krótkie podsumowanie miesiąca. Książki W grudniu skończyłem czytać Mein Kampf. Nudna książka. Ciekawsze fragmenty można by było streścić… Read More

4 miesiące ago

Praca zdalna – co z nią dalej?

Cześć, ostatnio w Internecie pojawiło się dużo artykułów, które nie były przychylne pracy zdalnej. Z drugiej strony większość komentarzy pod… Read More

4 miesiące ago

Podsumowanie: listopad 2023

Zapraszam na krótkie podsumowanie miesiąca. Książki W listopadzie dokończyłem cykl "Z mgły zrodzony" Sandersona. Tylko "Stop prawa" mi nie do… Read More

5 miesięcy ago