websocket.py 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030
  1. #!/usr/bin/env python
  2. '''
  3. Python WebSocket library with support for "wss://" encryption.
  4. Copyright 2011 Joel Martin
  5. Licensed under LGPL version 3 (see docs/LICENSE.LGPL-3)
  6. Supports following protocol versions:
  7. - http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-07
  8. - http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10
  9. - http://tools.ietf.org/html/rfc6455
  10. You can make a cert/key with openssl using:
  11. openssl req -new -x509 -days 365 -nodes -out self.pem -keyout self.pem
  12. as taken from http://docs.python.org/dev/library/ssl.html#certificates
  13. '''
  14. import os, sys, time, errno, signal, socket, select, logging
  15. import array, struct
  16. from base64 import b64encode, b64decode
  17. # Imports that vary by python version
  18. # python 3.0 differences
  19. if sys.hexversion > 0x3000000:
  20. b2s = lambda buf: buf.decode('latin_1')
  21. s2b = lambda s: s.encode('latin_1')
  22. s2a = lambda s: s
  23. else:
  24. b2s = lambda buf: buf # No-op
  25. s2b = lambda s: s # No-op
  26. s2a = lambda s: [ord(c) for c in s]
  27. try: from io import StringIO
  28. except: from cStringIO import StringIO
  29. try: from http.server import SimpleHTTPRequestHandler
  30. except: from SimpleHTTPServer import SimpleHTTPRequestHandler
  31. # python 2.6 differences
  32. try: from hashlib import sha1
  33. except: from sha import sha as sha1
  34. # python 2.5 differences
  35. try:
  36. from struct import pack, unpack_from
  37. except:
  38. from struct import pack
  39. def unpack_from(fmt, buf, offset=0):
  40. slice = buffer(buf, offset, struct.calcsize(fmt))
  41. return struct.unpack(fmt, slice)
  42. # Degraded functionality if these imports are missing
  43. for mod, msg in [('numpy', 'HyBi protocol will be slower'),
  44. ('ssl', 'TLS/SSL/wss is disabled'),
  45. ('multiprocessing', 'Multi-Processing is disabled'),
  46. ('resource', 'daemonizing is disabled')]:
  47. try:
  48. globals()[mod] = __import__(mod)
  49. except ImportError:
  50. globals()[mod] = None
  51. print("WARNING: no '%s' module, %s" % (mod, msg))
  52. if multiprocessing and sys.platform == 'win32':
  53. # make sockets pickle-able/inheritable
  54. import multiprocessing.reduction
  55. # HTTP handler with WebSocket upgrade support
  56. class WebSocketRequestHandler(SimpleHTTPRequestHandler):
  57. """
  58. WebSocket Request Handler Class, derived from SimpleHTTPRequestHandler.
  59. Must be sub-classed with new_websocket_client method definition.
  60. The request handler can be configured by setting optional
  61. attributes on the server object:
  62. * only_upgrade: If true, SimpleHTTPRequestHandler will not be enabled,
  63. only websocket is allowed.
  64. * verbose: If true, verbose logging is activated.
  65. * daemon: Running as daemon, do not write to console etc
  66. * record: Record raw frame data as JavaScript array into specified filename
  67. * run_once: Handle a single request
  68. * handler_id: A sequence number for this connection, appended to record filename
  69. """
  70. buffer_size = 65536
  71. GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
  72. server_version = "WebSockify"
  73. protocol_version = "HTTP/1.1"
  74. # An exception while the WebSocket client was connected
  75. class CClose(Exception):
  76. pass
  77. def __init__(self, req, addr, server):
  78. # Retrieve a few configuration variables from the server
  79. self.only_upgrade = getattr(server, "only_upgrade", False)
  80. self.verbose = getattr(server, "verbose", False)
  81. self.daemon = getattr(server, "daemon", False)
  82. self.record = getattr(server, "record", False)
  83. self.run_once = getattr(server, "run_once", False)
  84. self.rec = None
  85. self.handler_id = getattr(server, "handler_id", False)
  86. self.file_only = getattr(server, "file_only", False)
  87. self.traffic = getattr(server, "traffic", False)
  88. self.logger = getattr(server, "logger", None)
  89. if self.logger is None:
  90. self.logger = WebSocketServer.get_logger()
  91. SimpleHTTPRequestHandler.__init__(self, req, addr, server)
  92. @staticmethod
  93. def unmask(buf, hlen, plen):
  94. pstart = hlen + 4
  95. pend = pstart + plen
  96. if numpy:
  97. b = c = s2b('')
  98. if plen >= 4:
  99. mask = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
  100. offset=hlen, count=1)
  101. data = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
  102. offset=pstart, count=int(plen / 4))
  103. #b = numpy.bitwise_xor(data, mask).data
  104. b = numpy.bitwise_xor(data, mask).tostring()
  105. if plen % 4:
  106. #self.msg("Partial unmask")
  107. mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
  108. offset=hlen, count=(plen % 4))
  109. data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
  110. offset=pend - (plen % 4),
  111. count=(plen % 4))
  112. c = numpy.bitwise_xor(data, mask).tostring()
  113. return b + c
  114. else:
  115. # Slower fallback
  116. mask = buf[hlen:hlen+4]
  117. data = array.array('B')
  118. mask = s2a(mask)
  119. data.fromstring(buf[pstart:pend])
  120. for i in range(len(data)):
  121. data[i] ^= mask[i % 4]
  122. return data.tostring()
  123. @staticmethod
  124. def encode_hybi(buf, opcode, base64=False):
  125. """ Encode a HyBi style WebSocket frame.
  126. Optional opcode:
  127. 0x0 - continuation
  128. 0x1 - text frame (base64 encode buf)
  129. 0x2 - binary frame (use raw buf)
  130. 0x8 - connection close
  131. 0x9 - ping
  132. 0xA - pong
  133. """
  134. if base64:
  135. buf = b64encode(buf)
  136. b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
  137. payload_len = len(buf)
  138. if payload_len <= 125:
  139. header = pack('>BB', b1, payload_len)
  140. elif payload_len > 125 and payload_len < 65536:
  141. header = pack('>BBH', b1, 126, payload_len)
  142. elif payload_len >= 65536:
  143. header = pack('>BBQ', b1, 127, payload_len)
  144. #self.msg("Encoded: %s", repr(header + buf))
  145. return header + buf, len(header), 0
  146. @staticmethod
  147. def decode_hybi(buf, base64=False, logger=None):
  148. """ Decode HyBi style WebSocket packets.
  149. Returns:
  150. {'fin' : 0_or_1,
  151. 'opcode' : number,
  152. 'masked' : boolean,
  153. 'hlen' : header_bytes_number,
  154. 'length' : payload_bytes_number,
  155. 'payload' : decoded_buffer,
  156. 'left' : bytes_left_number,
  157. 'close_code' : number,
  158. 'close_reason' : string}
  159. """
  160. f = {'fin' : 0,
  161. 'opcode' : 0,
  162. 'masked' : False,
  163. 'hlen' : 2,
  164. 'length' : 0,
  165. 'payload' : None,
  166. 'left' : 0,
  167. 'close_code' : 1000,
  168. 'close_reason' : ''}
  169. if logger is None:
  170. logger = WebSocketServer.get_logger()
  171. blen = len(buf)
  172. f['left'] = blen
  173. if blen < f['hlen']:
  174. return f # Incomplete frame header
  175. b1, b2 = unpack_from(">BB", buf)
  176. f['opcode'] = b1 & 0x0f
  177. f['fin'] = (b1 & 0x80) >> 7
  178. f['masked'] = (b2 & 0x80) >> 7
  179. f['length'] = b2 & 0x7f
  180. if f['length'] == 126:
  181. f['hlen'] = 4
  182. if blen < f['hlen']:
  183. return f # Incomplete frame header
  184. (f['length'],) = unpack_from('>xxH', buf)
  185. elif f['length'] == 127:
  186. f['hlen'] = 10
  187. if blen < f['hlen']:
  188. return f # Incomplete frame header
  189. (f['length'],) = unpack_from('>xxQ', buf)
  190. full_len = f['hlen'] + f['masked'] * 4 + f['length']
  191. if blen < full_len: # Incomplete frame
  192. return f # Incomplete frame header
  193. # Number of bytes that are part of the next frame(s)
  194. f['left'] = blen - full_len
  195. # Process 1 frame
  196. if f['masked']:
  197. # unmask payload
  198. f['payload'] = WebSocketRequestHandler.unmask(buf, f['hlen'],
  199. f['length'])
  200. else:
  201. logger.debug("Unmasked frame: %s" % repr(buf))
  202. f['payload'] = buf[(f['hlen'] + f['masked'] * 4):full_len]
  203. if base64 and f['opcode'] in [1, 2]:
  204. try:
  205. f['payload'] = b64decode(f['payload'])
  206. except:
  207. logger.exception("Exception while b64decoding buffer: %s" %
  208. (repr(buf)))
  209. raise
  210. if f['opcode'] == 0x08:
  211. if f['length'] >= 2:
  212. f['close_code'] = unpack_from(">H", f['payload'])[0]
  213. if f['length'] > 3:
  214. f['close_reason'] = f['payload'][2:]
  215. return f
  216. #
  217. # WebSocketRequestHandler logging/output functions
  218. #
  219. def print_traffic(self, token="."):
  220. """ Show traffic flow mode. """
  221. if self.traffic:
  222. sys.stdout.write(token)
  223. sys.stdout.flush()
  224. def msg(self, msg, *args, **kwargs):
  225. """ Output message with handler_id prefix. """
  226. prefix = "% 3d: " % self.handler_id
  227. self.logger.log(logging.INFO, "%s%s" % (prefix, msg), *args, **kwargs)
  228. def vmsg(self, msg, *args, **kwargs):
  229. """ Same as msg() but as debug. """
  230. prefix = "% 3d: " % self.handler_id
  231. self.logger.log(logging.DEBUG, "%s%s" % (prefix, msg), *args, **kwargs)
  232. def warn(self, msg, *args, **kwargs):
  233. """ Same as msg() but as warning. """
  234. prefix = "% 3d: " % self.handler_id
  235. self.logger.log(logging.WARN, "%s%s" % (prefix, msg), *args, **kwargs)
  236. #
  237. # Main WebSocketRequestHandler methods
  238. #
  239. def send_frames(self, bufs=None):
  240. """ Encode and send WebSocket frames. Any frames already
  241. queued will be sent first. If buf is not set then only queued
  242. frames will be sent. Returns the number of pending frames that
  243. could not be fully sent. If returned pending frames is greater
  244. than 0, then the caller should call again when the socket is
  245. ready. """
  246. tdelta = int(time.time()*1000) - self.start_time
  247. if bufs:
  248. for buf in bufs:
  249. if self.base64:
  250. encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=1, base64=True)
  251. else:
  252. encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=2, base64=False)
  253. if self.rec:
  254. self.rec.write("%s,\n" %
  255. repr("{%s{" % tdelta
  256. + encbuf[lenhead:len(encbuf)-lentail]))
  257. self.send_parts.append(encbuf)
  258. while self.send_parts:
  259. # Send pending frames
  260. buf = self.send_parts.pop(0)
  261. sent = self.request.send(buf)
  262. if sent == len(buf):
  263. self.print_traffic("<")
  264. else:
  265. self.print_traffic("<.")
  266. self.send_parts.insert(0, buf[sent:])
  267. break
  268. return len(self.send_parts)
  269. def recv_frames(self):
  270. """ Receive and decode WebSocket frames.
  271. Returns:
  272. (bufs_list, closed_string)
  273. """
  274. closed = False
  275. bufs = []
  276. tdelta = int(time.time()*1000) - self.start_time
  277. buf = self.request.recv(self.buffer_size)
  278. if len(buf) == 0:
  279. closed = {'code': 1000, 'reason': "Client closed abruptly"}
  280. return bufs, closed
  281. if self.recv_part:
  282. # Add partially received frames to current read buffer
  283. buf = self.recv_part + buf
  284. self.recv_part = None
  285. while buf:
  286. frame = self.decode_hybi(buf, base64=self.base64,
  287. logger=self.logger)
  288. #self.msg("Received buf: %s, frame: %s", repr(buf), frame)
  289. if frame['payload'] == None:
  290. # Incomplete/partial frame
  291. self.print_traffic("}.")
  292. if frame['left'] > 0:
  293. self.recv_part = buf[-frame['left']:]
  294. break
  295. else:
  296. if frame['opcode'] == 0x8: # connection close
  297. closed = {'code': frame['close_code'],
  298. 'reason': frame['close_reason']}
  299. break
  300. self.print_traffic("}")
  301. if self.rec:
  302. start = frame['hlen']
  303. end = frame['hlen'] + frame['length']
  304. if frame['masked']:
  305. recbuf = WebSocketRequestHandler.unmask(buf, frame['hlen'],
  306. frame['length'])
  307. else:
  308. recbuf = buf[frame['hlen']:frame['hlen'] +
  309. frame['length']]
  310. self.rec.write("%s,\n" %
  311. repr("}%s}" % tdelta + recbuf))
  312. bufs.append(frame['payload'])
  313. if frame['left']:
  314. buf = buf[-frame['left']:]
  315. else:
  316. buf = ''
  317. return bufs, closed
  318. def send_close(self, code=1000, reason=''):
  319. """ Send a WebSocket orderly close frame. """
  320. msg = pack(">H%ds" % len(reason), code, reason)
  321. buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
  322. self.request.send(buf)
  323. def do_websocket_handshake(self):
  324. h = self.headers
  325. prot = 'WebSocket-Protocol'
  326. protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',')
  327. ver = h.get('Sec-WebSocket-Version')
  328. if ver:
  329. # HyBi/IETF version of the protocol
  330. # HyBi-07 report version 7
  331. # HyBi-08 - HyBi-12 report version 8
  332. # HyBi-13 reports version 13
  333. if ver in ['7', '8', '13']:
  334. self.version = "hybi-%02d" % int(ver)
  335. else:
  336. self.send_error(400, "Unsupported protocol version %s" % ver)
  337. return False
  338. key = h['Sec-WebSocket-Key']
  339. # Choose binary if client supports it
  340. if 'binary' in protocols:
  341. self.base64 = False
  342. elif 'base64' in protocols:
  343. self.base64 = True
  344. else:
  345. self.send_error(400, "Client must support 'binary' or 'base64' protocol")
  346. return False
  347. # Generate the hash value for the accept header
  348. accept = b64encode(sha1(s2b(key + self.GUID)).digest())
  349. self.send_response(101, "Switching Protocols")
  350. self.send_header("Upgrade", "websocket")
  351. self.send_header("Connection", "Upgrade")
  352. self.send_header("Sec-WebSocket-Accept", b2s(accept))
  353. if self.base64:
  354. self.send_header("Sec-WebSocket-Protocol", "base64")
  355. else:
  356. self.send_header("Sec-WebSocket-Protocol", "binary")
  357. self.end_headers()
  358. return True
  359. else:
  360. self.send_error(400, "Missing Sec-WebSocket-Version header. Hixie protocols not supported.")
  361. return False
  362. def handle_websocket(self):
  363. """Upgrade a connection to Websocket, if requested. If this succeeds,
  364. new_websocket_client() will be called. Otherwise, False is returned.
  365. """
  366. if (self.headers.get('upgrade') and
  367. self.headers.get('upgrade').lower() == 'websocket'):
  368. if not self.do_websocket_handshake():
  369. return False
  370. # Indicate to server that a Websocket upgrade was done
  371. self.server.ws_connection = True
  372. # Initialize per client settings
  373. self.send_parts = []
  374. self.recv_part = None
  375. self.start_time = int(time.time()*1000)
  376. # client_address is empty with, say, UNIX domain sockets
  377. client_addr = ""
  378. is_ssl = False
  379. try:
  380. client_addr = self.client_address[0]
  381. is_ssl = self.client_address[2]
  382. except IndexError:
  383. pass
  384. if is_ssl:
  385. self.stype = "SSL/TLS (wss://)"
  386. else:
  387. self.stype = "Plain non-SSL (ws://)"
  388. self.log_message("%s: %s WebSocket connection", client_addr,
  389. self.stype)
  390. self.log_message("%s: Version %s, base64: '%s'", client_addr,
  391. self.version, self.base64)
  392. if self.path != '/':
  393. self.log_message("%s: Path: '%s'", client_addr, self.path)
  394. if self.record:
  395. # Record raw frame data as JavaScript array
  396. fname = "%s.%s" % (self.record,
  397. self.handler_id)
  398. self.log_message("opening record file: %s", fname)
  399. self.rec = open(fname, 'w+')
  400. encoding = "binary"
  401. if self.base64: encoding = "base64"
  402. self.rec.write("var VNC_frame_encoding = '%s';\n"
  403. % encoding)
  404. self.rec.write("var VNC_frame_data = [\n")
  405. try:
  406. self.new_websocket_client()
  407. except self.CClose:
  408. # Close the client
  409. _, exc, _ = sys.exc_info()
  410. self.send_close(exc.args[0], exc.args[1])
  411. return True
  412. else:
  413. return False
  414. def do_GET(self):
  415. """Handle GET request. Calls handle_websocket(). If unsuccessful,
  416. and web server is enabled, SimpleHTTPRequestHandler.do_GET will be called."""
  417. if not self.handle_websocket():
  418. if self.only_upgrade:
  419. self.send_error(405, "Method Not Allowed")
  420. else:
  421. SimpleHTTPRequestHandler.do_GET(self)
  422. def list_directory(self, path):
  423. if self.file_only:
  424. self.send_error(404, "No such file")
  425. else:
  426. return SimpleHTTPRequestHandler.list_directory(self, path)
  427. def new_websocket_client(self):
  428. """ Do something with a WebSockets client connection. """
  429. raise Exception("WebSocketRequestHandler.new_websocket_client() must be overloaded")
  430. def do_HEAD(self):
  431. if self.only_upgrade:
  432. self.send_error(405, "Method Not Allowed")
  433. else:
  434. SimpleHTTPRequestHandler.do_HEAD(self)
  435. def finish(self):
  436. if self.rec:
  437. self.rec.write("'EOF'];\n")
  438. self.rec.close()
  439. def handle(self):
  440. # When using run_once, we have a single process, so
  441. # we cannot loop in BaseHTTPRequestHandler.handle; we
  442. # must return and handle new connections
  443. if self.run_once:
  444. self.handle_one_request()
  445. else:
  446. SimpleHTTPRequestHandler.handle(self)
  447. def log_request(self, code='-', size='-'):
  448. if self.verbose:
  449. SimpleHTTPRequestHandler.log_request(self, code, size)
  450. class WebSocketServer(object):
  451. """
  452. WebSockets server class.
  453. As an alternative, the standard library SocketServer can be used
  454. """
  455. policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""
  456. log_prefix = "websocket"
  457. # An exception before the WebSocket connection was established
  458. class EClose(Exception):
  459. pass
  460. class Terminate(Exception):
  461. pass
  462. def __init__(self, RequestHandlerClass, listen_host='',
  463. listen_port=None, source_is_ipv6=False,
  464. verbose=False, cert='', key='', ssl_only=None,
  465. daemon=False, record='', web='',
  466. file_only=False,
  467. run_once=False, timeout=0, idle_timeout=0, traffic=False,
  468. tcp_keepalive=True, tcp_keepcnt=None, tcp_keepidle=None,
  469. tcp_keepintvl=None):
  470. # settings
  471. self.RequestHandlerClass = RequestHandlerClass
  472. self.verbose = verbose
  473. self.listen_host = listen_host
  474. self.listen_port = listen_port
  475. self.prefer_ipv6 = source_is_ipv6
  476. self.ssl_only = ssl_only
  477. self.daemon = daemon
  478. self.run_once = run_once
  479. self.timeout = timeout
  480. self.idle_timeout = idle_timeout
  481. self.traffic = traffic
  482. self.launch_time = time.time()
  483. self.ws_connection = False
  484. self.handler_id = 1
  485. self.logger = self.get_logger()
  486. self.tcp_keepalive = tcp_keepalive
  487. self.tcp_keepcnt = tcp_keepcnt
  488. self.tcp_keepidle = tcp_keepidle
  489. self.tcp_keepintvl = tcp_keepintvl
  490. # Make paths settings absolute
  491. self.cert = os.path.abspath(cert)
  492. self.key = self.web = self.record = ''
  493. if key:
  494. self.key = os.path.abspath(key)
  495. if web:
  496. self.web = os.path.abspath(web)
  497. if record:
  498. self.record = os.path.abspath(record)
  499. if self.web:
  500. os.chdir(self.web)
  501. self.only_upgrade = not self.web
  502. # Sanity checks
  503. if not ssl and self.ssl_only:
  504. raise Exception("No 'ssl' module and SSL-only specified")
  505. if self.daemon and not resource:
  506. raise Exception("Module 'resource' required to daemonize")
  507. # Show configuration
  508. self.msg("WebSocket server settings:")
  509. self.msg(" - Listen on %s:%s",
  510. self.listen_host, self.listen_port)
  511. self.msg(" - Flash security policy server")
  512. if self.web:
  513. self.msg(" - Web server. Web root: %s", self.web)
  514. if ssl:
  515. if os.path.exists(self.cert):
  516. self.msg(" - SSL/TLS support")
  517. if self.ssl_only:
  518. self.msg(" - Deny non-SSL/TLS connections")
  519. else:
  520. self.msg(" - No SSL/TLS support (no cert file)")
  521. else:
  522. self.msg(" - No SSL/TLS support (no 'ssl' module)")
  523. if self.daemon:
  524. self.msg(" - Backgrounding (daemon)")
  525. if self.record:
  526. self.msg(" - Recording to '%s.*'", self.record)
  527. #
  528. # WebSocketServer static methods
  529. #
  530. @staticmethod
  531. def get_logger():
  532. return logging.getLogger("%s.%s" % (
  533. WebSocketServer.log_prefix,
  534. WebSocketServer.__class__.__name__))
  535. @staticmethod
  536. def socket(host, port=None, connect=False, prefer_ipv6=False,
  537. unix_socket=None, use_ssl=False, tcp_keepalive=True,
  538. tcp_keepcnt=None, tcp_keepidle=None, tcp_keepintvl=None):
  539. """ Resolve a host (and optional port) to an IPv4 or IPv6
  540. address. Create a socket. Bind to it if listen is set,
  541. otherwise connect to it. Return the socket.
  542. """
  543. flags = 0
  544. if host == '':
  545. host = None
  546. if connect and not (port or unix_socket):
  547. raise Exception("Connect mode requires a port")
  548. if use_ssl and not ssl:
  549. raise Exception("SSL socket requested but Python SSL module not loaded.");
  550. if not connect and use_ssl:
  551. raise Exception("SSL only supported in connect mode (for now)")
  552. if not connect:
  553. flags = flags | socket.AI_PASSIVE
  554. if not unix_socket:
  555. addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM,
  556. socket.IPPROTO_TCP, flags)
  557. if not addrs:
  558. raise Exception("Could not resolve host '%s'" % host)
  559. addrs.sort(key=lambda x: x[0])
  560. if prefer_ipv6:
  561. addrs.reverse()
  562. sock = socket.socket(addrs[0][0], addrs[0][1])
  563. if tcp_keepalive:
  564. sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
  565. if tcp_keepcnt:
  566. sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT,
  567. tcp_keepcnt)
  568. if tcp_keepidle:
  569. sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE,
  570. tcp_keepidle)
  571. if tcp_keepintvl:
  572. sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL,
  573. tcp_keepintvl)
  574. if connect:
  575. sock.connect(addrs[0][4])
  576. if use_ssl:
  577. sock = ssl.wrap_socket(sock)
  578. else:
  579. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  580. sock.bind(addrs[0][4])
  581. sock.listen(100)
  582. else:
  583. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  584. sock.connect(unix_socket)
  585. return sock
  586. @staticmethod
  587. def daemonize(keepfd=None, chdir='/'):
  588. os.umask(0)
  589. if chdir:
  590. os.chdir(chdir)
  591. else:
  592. os.chdir('/')
  593. os.setgid(os.getgid()) # relinquish elevations
  594. os.setuid(os.getuid()) # relinquish elevations
  595. # Double fork to daemonize
  596. if os.fork() > 0: os._exit(0) # Parent exits
  597. os.setsid() # Obtain new process group
  598. if os.fork() > 0: os._exit(0) # Parent exits
  599. # Signal handling
  600. signal.signal(signal.SIGTERM, signal.SIG_IGN)
  601. signal.signal(signal.SIGINT, signal.SIG_IGN)
  602. # Close open files
  603. maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
  604. if maxfd == resource.RLIM_INFINITY: maxfd = 256
  605. for fd in reversed(range(maxfd)):
  606. try:
  607. if fd != keepfd:
  608. os.close(fd)
  609. except OSError:
  610. _, exc, _ = sys.exc_info()
  611. if exc.errno != errno.EBADF: raise
  612. # Redirect I/O to /dev/null
  613. os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
  614. os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
  615. os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
  616. def do_handshake(self, sock, address):
  617. """
  618. do_handshake does the following:
  619. - Peek at the first few bytes from the socket.
  620. - If the connection is Flash policy request then answer it,
  621. close the socket and return.
  622. - If the connection is an HTTPS/SSL/TLS connection then SSL
  623. wrap the socket.
  624. - Read from the (possibly wrapped) socket.
  625. - If we have received a HTTP GET request and the webserver
  626. functionality is enabled, answer it, close the socket and
  627. return.
  628. - Assume we have a WebSockets connection, parse the client
  629. handshake data.
  630. - Send a WebSockets handshake server response.
  631. - Return the socket for this WebSocket client.
  632. """
  633. ready = select.select([sock], [], [], 3)[0]
  634. if not ready:
  635. raise self.EClose("ignoring socket not ready")
  636. # Peek, but do not read the data so that we have a opportunity
  637. # to SSL wrap the socket first
  638. handshake = sock.recv(1024, socket.MSG_PEEK)
  639. #self.msg("Handshake [%s]" % handshake)
  640. if handshake == "":
  641. raise self.EClose("ignoring empty handshake")
  642. elif handshake.startswith(s2b("<policy-file-request/>")):
  643. # Answer Flash policy request
  644. handshake = sock.recv(1024)
  645. sock.send(s2b(self.policy_response))
  646. raise self.EClose("Sending flash policy response")
  647. elif handshake[0] in ("\x16", "\x80", 22, 128):
  648. # SSL wrap the connection
  649. if not ssl:
  650. raise self.EClose("SSL connection but no 'ssl' module")
  651. if not os.path.exists(self.cert):
  652. raise self.EClose("SSL connection but '%s' not found"
  653. % self.cert)
  654. retsock = None
  655. try:
  656. retsock = ssl.wrap_socket(
  657. sock,
  658. server_side=True,
  659. certfile=self.cert,
  660. keyfile=self.key)
  661. except ssl.SSLError:
  662. _, x, _ = sys.exc_info()
  663. if x.args[0] == ssl.SSL_ERROR_EOF:
  664. if len(x.args) > 1:
  665. raise self.EClose(x.args[1])
  666. else:
  667. raise self.EClose("Got SSL_ERROR_EOF")
  668. else:
  669. raise
  670. elif self.ssl_only:
  671. raise self.EClose("non-SSL connection received but disallowed")
  672. else:
  673. retsock = sock
  674. # If the address is like (host, port), we are extending it
  675. # with a flag indicating SSL. Not many other options
  676. # available...
  677. if len(address) == 2:
  678. address = (address[0], address[1], (retsock != sock))
  679. self.RequestHandlerClass(retsock, address, self)
  680. # Return the WebSockets socket which may be SSL wrapped
  681. return retsock
  682. #
  683. # WebSocketServer logging/output functions
  684. #
  685. def msg(self, *args, **kwargs):
  686. """ Output message as info """
  687. self.logger.log(logging.INFO, *args, **kwargs)
  688. def vmsg(self, *args, **kwargs):
  689. """ Same as msg() but as debug. """
  690. self.logger.log(logging.DEBUG, *args, **kwargs)
  691. def warn(self, *args, **kwargs):
  692. """ Same as msg() but as warning. """
  693. self.logger.log(logging.WARN, *args, **kwargs)
  694. #
  695. # Events that can/should be overridden in sub-classes
  696. #
  697. def started(self):
  698. """ Called after WebSockets startup """
  699. self.vmsg("WebSockets server started")
  700. def poll(self):
  701. """ Run periodically while waiting for connections. """
  702. #self.vmsg("Running poll()")
  703. pass
  704. def terminate(self):
  705. raise self.Terminate()
  706. def multiprocessing_SIGCHLD(self, sig, stack):
  707. self.vmsg('Reaing zombies, active child count is %s', len(multiprocessing.active_children()))
  708. def fallback_SIGCHLD(self, sig, stack):
  709. # Reap zombies when using os.fork() (python 2.4)
  710. self.vmsg("Got SIGCHLD, reaping zombies")
  711. try:
  712. result = os.waitpid(-1, os.WNOHANG)
  713. while result[0]:
  714. self.vmsg("Reaped child process %s" % result[0])
  715. result = os.waitpid(-1, os.WNOHANG)
  716. except (OSError):
  717. pass
  718. def do_SIGINT(self, sig, stack):
  719. self.msg("Got SIGINT, exiting")
  720. self.terminate()
  721. def do_SIGTERM(self, sig, stack):
  722. self.msg("Got SIGTERM, exiting")
  723. self.terminate()
  724. def top_new_client(self, startsock, address):
  725. """ Do something with a WebSockets client connection. """
  726. # handler process
  727. client = None
  728. try:
  729. try:
  730. client = self.do_handshake(startsock, address)
  731. except self.EClose:
  732. _, exc, _ = sys.exc_info()
  733. # Connection was not a WebSockets connection
  734. if exc.args[0]:
  735. self.msg("%s: %s" % (address[0], exc.args[0]))
  736. except WebSocketServer.Terminate:
  737. raise
  738. except Exception:
  739. _, exc, _ = sys.exc_info()
  740. self.msg("handler exception: %s" % str(exc))
  741. self.vmsg("exception", exc_info=True)
  742. finally:
  743. if client and client != startsock:
  744. # Close the SSL wrapped socket
  745. # Original socket closed by caller
  746. client.close()
  747. def start_server(self):
  748. """
  749. Daemonize if requested. Listen for for connections. Run
  750. do_handshake() method for each connection. If the connection
  751. is a WebSockets client then call new_websocket_client() method (which must
  752. be overridden) for each new client connection.
  753. """
  754. lsock = self.socket(self.listen_host, self.listen_port, False,
  755. self.prefer_ipv6,
  756. tcp_keepalive=self.tcp_keepalive,
  757. tcp_keepcnt=self.tcp_keepcnt,
  758. tcp_keepidle=self.tcp_keepidle,
  759. tcp_keepintvl=self.tcp_keepintvl)
  760. if self.daemon:
  761. self.daemonize(keepfd=lsock.fileno(), chdir=self.web)
  762. self.started() # Some things need to happen after daemonizing
  763. # Allow override of signals
  764. original_signals = {
  765. signal.SIGINT: signal.getsignal(signal.SIGINT),
  766. signal.SIGTERM: signal.getsignal(signal.SIGTERM),
  767. signal.SIGCHLD: signal.getsignal(signal.SIGCHLD),
  768. }
  769. signal.signal(signal.SIGINT, self.do_SIGINT)
  770. signal.signal(signal.SIGTERM, self.do_SIGTERM)
  771. if not multiprocessing:
  772. # os.fork() (python 2.4) child reaper
  773. signal.signal(signal.SIGCHLD, self.fallback_SIGCHLD)
  774. else:
  775. # make sure that _cleanup is called when children die
  776. # by calling active_children on SIGCHLD
  777. signal.signal(signal.SIGCHLD, self.multiprocessing_SIGCHLD)
  778. last_active_time = self.launch_time
  779. try:
  780. while True:
  781. try:
  782. try:
  783. startsock = None
  784. pid = err = 0
  785. child_count = 0
  786. if multiprocessing:
  787. # Collect zombie child processes
  788. child_count = len(multiprocessing.active_children())
  789. time_elapsed = time.time() - self.launch_time
  790. if self.timeout and time_elapsed > self.timeout:
  791. self.msg('listener exit due to --timeout %s'
  792. % self.timeout)
  793. break
  794. if self.idle_timeout:
  795. idle_time = 0
  796. if child_count == 0:
  797. idle_time = time.time() - last_active_time
  798. else:
  799. idle_time = 0
  800. last_active_time = time.time()
  801. if idle_time > self.idle_timeout and child_count == 0:
  802. self.msg('listener exit due to --idle-timeout %s'
  803. % self.idle_timeout)
  804. break
  805. try:
  806. self.poll()
  807. ready = select.select([lsock], [], [], 1)[0]
  808. if lsock in ready:
  809. startsock, address = lsock.accept()
  810. else:
  811. continue
  812. except self.Terminate:
  813. raise
  814. except Exception:
  815. _, exc, _ = sys.exc_info()
  816. if hasattr(exc, 'errno'):
  817. err = exc.errno
  818. elif hasattr(exc, 'args'):
  819. err = exc.args[0]
  820. else:
  821. err = exc[0]
  822. if err == errno.EINTR:
  823. self.vmsg("Ignoring interrupted syscall")
  824. continue
  825. else:
  826. raise
  827. if self.run_once:
  828. # Run in same process if run_once
  829. self.top_new_client(startsock, address)
  830. if self.ws_connection :
  831. self.msg('%s: exiting due to --run-once'
  832. % address[0])
  833. break
  834. elif multiprocessing:
  835. self.vmsg('%s: new handler Process' % address[0])
  836. p = multiprocessing.Process(
  837. target=self.top_new_client,
  838. args=(startsock, address))
  839. p.start()
  840. # child will not return
  841. else:
  842. # python 2.4
  843. self.vmsg('%s: forking handler' % address[0])
  844. pid = os.fork()
  845. if pid == 0:
  846. # child handler process
  847. self.top_new_client(startsock, address)
  848. break # child process exits
  849. # parent process
  850. self.handler_id += 1
  851. except (self.Terminate, SystemExit, KeyboardInterrupt):
  852. self.msg("In exit")
  853. break
  854. except Exception:
  855. self.msg("handler exception: %s", str(exc))
  856. self.vmsg("exception", exc_info=True)
  857. finally:
  858. if startsock:
  859. startsock.close()
  860. finally:
  861. # Close listen port
  862. self.vmsg("Closing socket listening at %s:%s",
  863. self.listen_host, self.listen_port)
  864. lsock.close()
  865. # Restore signals
  866. for sig, func in original_signals.items():
  867. signal.signal(sig, func)