Toggle Navigation
Hatchery
Eggs
CityControlTest
uso_protocol.py
Users
Badges
Login
Register
uso_protocol.py
raw
Content
""" Socket.io/Engine.io protocol constructs """ PAYLOAD_STRING = 0 PAYLOAD_BINARY = 1 PACKET_OPEN = 0 PACKET_CLOSE = 1 PACKET_PING = 2 PACKET_PONG = 3 PACKET_MESSAGE = 4 PACKET_UPGRADE = 5 PACKET_NOOP = 6 MESSAGE_CONNECT = 0 MESSAGE_DISCONNECT = 1 MESSAGE_EVENT = 2 MESSAGE_ACK = 3 MESSAGE_ERROR = 4 MESSAGE_BINARY_EVENT = 5 MESSAGE_BINARY_ACK = 6 def decode_packet(buf): if isinstance(buf, str) and buf[0] == "b": # FIXME: implement base64 protocol raise NotImplementedError() return int(buf[0]), buf[1:] def decode_payload(buf): buf = memoryview(buf) while buf: packettype = int(chr(buf[0])) length = 0 while True: # c = buf[0] # buf = buf[1:] # Octet stream = 0xff, Application/text = ascii 125 curly brace # if c == 0xff: c = buf[length] if c == 125: length += 1 break length += 1 # length *= 10 # length += c packet = bytes(buf[:length]) if packettype == PAYLOAD_STRING: packet = packet.decode("utf-8") elif packettype == PAYLOAD_BINARY: pass else: raise NotImplementedError() yield decode_packet(packet) buf = buf[length:]