Toggle Navigation
Hatchery
Eggs
CityControlTest
uso_client.py
Users
Badges
Login
Register
uso_client.py
raw
Content
""" Micropython Socket.IO client. """ import re as re import json as json import socket as socket import uasyncio from collections import namedtuple from uso_protocol import * from uso_transport import SocketIO URL_RE = re.compile(r"(http|https)://([A-Za-z0-9\-\.]+)(?:\:([0-9]+))?(/.+)?") URI = namedtuple("URI", ("protocol", "hostname", "port", "path")) def urlparse(uri): """Parse http:// URLs""" match = URL_RE.match(uri) if match: port = match.group(3) if port is None: port = 80 if match.group(1) == "http" else 443 return URI(match.group(1), match.group(2), int(port), match.group(4)) def _connect_http(protocol, hostname, port, path): """Stage 1 do the HTTP connection to get our SID""" try: sock = socket.socket() addr = socket.getaddrinfo(hostname, port) if protocol == "https": # sock = ssl.wrap_socket(sock) raise Exception("SSL not supported for now") sock.connect(addr[0][4]) def send_header(header, *args): # print(str(header) % args) sock.write(header % args + '\r\n') send_header("GET %s HTTP/1.1", path) send_header("Host: %s:%s", hostname, port) send_header("") # Unix adoption to read line by line # We have to make a file object which exposes file.readline() file = sock.makefile("rb") header = file.readline() # header = sock.readline()[:-2] # Unix adoption: added \r\n assert header == b"HTTP/1.1 200 OK\r\n", header length = None # print("Received response, headers:") while header: header = file.readline()[:-2] # print(header.decode("UTF-8")) if not header: break header, value = header.split(b": ") header = header.lower() if header == b"content-type": # assert value == b'application/octet-stream' # Unix adopttion: Assertion value: application/octet-stream assert value == b"text/plain; charset=UTF-8" elif header == b"content-length": length = int(value) data = file.read(length) # data = sock.recv(40960) # return data return decode_payload(data) finally: sock.close() def connect(uri): """Connect to a socket IO server.""" uri = urlparse(uri) assert uri path = uri.path or "/" + "socket.io/?EIO=4" # Start a HTTP connection, which will give us an SID to use to upgrade # the websockets connection packets = _connect_http(uri.protocol, uri.hostname, uri.port, path) # The first packet should open the connection, # following packets might be initialisation messages for us packet_type, params = next(packets) assert packet_type == PACKET_OPEN params = json.loads(params) print("Websocket parameters = %s" % params) assert "websocket" in params["upgrades"] sid = params["sid"] path += "&sid={}".format(sid) print("Connecting to websocket SID %s" % sid) # Unix adaption if uri.protocol == "http": ws_protocol = "ws" else: ws_protocol = "wss" # Start a websocket and send a probe on it ws_uri = ws_protocol + "://{hostname}:{port}{path}&transport=websocket".format( hostname=uri.hostname, port=uri.port, path=path ) socketio = SocketIO(ws_uri, **params) # handle rest of the packets once we're in the main loop @socketio.on("connection") def on_connect(data): print("Handling on connect") for packet_type, data in packets: print("Handling on connect packet: (%s, %s)" % (packet_type, data)) await socketio._handle_packet(packet_type, data) print("Sending Ping") uasyncio.run(socketio._send_packet(PACKET_PING, "probe")) print("Sent Ping") # We should receive an answer to our probe print("Checking for PONG packet") packet = uasyncio.run(socketio._recv()) assert packet == (PACKET_PONG, "probe") print("Received PONG packet") # Send a follow-up poll _connect_http(uri.protocol, uri.hostname, uri.port, path + "&transport=polling") # Upgrade the connection print("Sending WS Upgrade packet") uasyncio.run(socketio._send_packet(PACKET_UPGRADE)) print("Sent WS Upgrade packet") print("Checking for NOOP packet") packet = uasyncio.run(socketio._recv()) assert packet == (PACKET_NOOP, "") print("Received NOOP packet") return socketio