Toggle Navigation
Hatchery
Eggs
CityControlTest
uws_protocol.py
Users
Badges
Login
Register
uws_protocol.py
raw
Content
""" Websockets protocol """ import re import struct import random import sys from collections import namedtuple # Opcodes OP_CONT = 0x0 OP_TEXT = 0x1 OP_BYTES = 0x2 OP_CLOSE = 0x8 OP_PING = 0x9 OP_PONG = 0xA # Close codes CLOSE_OK = 1000 CLOSE_GOING_AWAY = 1001 CLOSE_PROTOCOL_ERROR = 1002 CLOSE_DATA_NOT_SUPPORTED = 1003 CLOSE_BAD_DATA = 1007 CLOSE_POLICY_VIOLATION = 1008 CLOSE_TOO_BIG = 1009 CLOSE_MISSING_EXTN = 1010 CLOSE_BAD_CONDITION = 1011 URL_RE = re.compile(r"(wss|ws)://([A-Za-z0-9-\.]+)(?:\:([0-9]+))?(/.+)?") URI = namedtuple("URI", ("protocol", "hostname", "port", "path")) class NoDataException(Exception): pass class ConnectionClosed(Exception): pass def urlparse(uri): """Parse ws:// URLs""" match = URL_RE.match(uri) if match: protocol = match.group(1) host = match.group(2) port = match.group(3) path = match.group(4) if protocol == "wss": if port is None: port = 443 elif protocol == "ws": if port is None: port = 80 else: raise ValueError("Scheme {} is invalid".format(protocol)) return URI(protocol, host, int(port), path) class Websocket: """ Basis of the Websocket protocol. This can probably be replaced with the C-based websocket module, but this one currently supports more options. """ is_client = False def __init__(self, reader, writer): self.reader = reader self.writer = writer self.open = True def __enter__(self): return self def __exit__(self, exc_type, exc, tb): self.close() def settimeout(self, timeout): # Figure out if we need/want this # Original: self.sock.settimeout(timeout) pass async def read_frame(self, max_size=None): """ Read a frame from the socket. See https://tools.ietf.org/html/rfc6455#section-5.2 for the details. """ # Frame header two_bytes = await self.reader.read(2) if not two_bytes: raise NoDataException byte1, byte2 = struct.unpack("!BB", two_bytes) # Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4) fin = bool(byte1 & 0x80) opcode = byte1 & 0x0F # Byte 2: MASK(1) LENGTH(7) mask = bool(byte2 & (1 << 7)) length = byte2 & 0x7F if length == 126: # Magic number, length header is 2 bytes (length,) = struct.unpack("!H", await self.reader.read(2)) elif length == 127: # Magic number, length header is 8 bytes (length,) = struct.unpack("!Q", await self.reader.read(8)) if mask: # Mask is 4 bytes mask_bits = await self.reader.read(4) try: data = await self.reader.read(length) except MemoryError: # We can't receive this many bytes, close the socket print("Frame of length %s too big. Closing" % length) self.close(code=CLOSE_TOO_BIG) return True, OP_CLOSE, None if mask: data = bytes(b ^ mask_bits[i % 4] for i, b in enumerate(data)) return fin, opcode, data async def write_frame(self, opcode, data=b""): """ Write a frame to the socket. See https://tools.ietf.org/html/rfc6455#section-5.2 for the details. """ fin = True mask = self.is_client # messages sent by client are masked _data = data length = len(data) # Frame header # Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4) byte1 = 0x80 if fin else 0 byte1 |= opcode # Byte 2: MASK(1) LENGTH(7) byte2 = 0x80 if mask else 0 if length < 126: # 126 is magic value to use 2-byte length header byte2 |= length await self.writer.awrite(struct.pack("!BB", byte1, byte2)) # self.sock.write(struct.pack('!BB', byte1, byte2)) elif length < (1 << 16): # Length fits in 2-bytes byte2 |= 126 # Magic code await self.writer.awrite(struct.pack("!BBH", byte1, byte2, length)) elif length < (1 << 64): byte2 |= 127 # Magic code await self.writer.awrite(struct.pack("!BBQ", byte1, byte2, length)) else: raise ValueError() if mask: # Mask is 4 bytes mask_bits = struct.pack("!I", random.getrandbits(32)) await self.writer.awrite(mask_bits) data = bytes(b ^ mask_bits[i % 4] for i, b in enumerate(data)) await self.writer.awrite(data) try: await self.writer.drain() except OSError as oserror: print("Connection to the backend failed:") # No use sending this exception to the backend, connection is closed sys.print_exception(oserror) async def recv(self): """ Receive data from the websocket. This is slightly different from 'websockets' in that it doesn't fire off a routine to process frames and put the data in a queue. If you don't call recv() sufficiently often you won't process control frames. """ assert self.open while self.open: try: fin, opcode, data = await self.read_frame() except NoDataException: return "" except ValueError: print("Failed to read frame. Socket dead.") self._close() raise ConnectionClosed() if not fin: raise NotImplementedError() if opcode == OP_TEXT: return data.decode("utf-8") elif opcode == OP_BYTES: return data elif opcode == OP_CLOSE: self._close() return elif opcode == OP_PONG: # Ignore this frame, keep waiting for a data frame continue elif opcode == OP_PING: # We need to send a pong frame print("Sending PONG") await self.write_frame(OP_PONG, data) # And then wait to receive continue elif opcode == OP_CONT: # This is a continuation of a previous frame raise NotImplementedError(opcode) else: raise ValueError(opcode) async def send(self, buf): """Send data to the websocket.""" assert self.open if isinstance(buf, str): opcode = OP_TEXT buf = buf.encode("utf-8") elif isinstance(buf, bytes): opcode = OP_BYTES else: raise TypeError() await self.write_frame(opcode, buf) async def close(self, code=CLOSE_OK, reason=""): """Close the websocket.""" if not self.open: return buf = struct.pack("!H", code) + reason.encode("utf-8") await self.write_frame(OP_CLOSE, buf) await self._close() async def _close(self): print("Connection closed") self.open = False self.reader.close() self.writer.close() await self.reader.wait_closed()