Toggle Navigation
Hatchery
Eggs
CityControl
uso_transport.py
Users
Badges
Login
Register
uso_transport.py
raw
Content
"""SocketIO transport.""" import json as json import socket as socket import uasyncio import uws_client from uso_protocol import * import sys import errno import utils class SocketIO: """SocketIO transport.""" def __init__(self, uri, reconnect=True, **params): self.uri = uri self.sid = params['sid'] self.should_run = True self.reconnect = reconnect self.websocket = uasyncio.run(uws_client.connect(uri)) print("Websocket after connect:", self.websocket) # Event handlers map from event -> [handlers, ...] self._event_handlers = {} # Interval handlers [(interval, handler), ...] self._interval_handlers = [] # Register a ping event self.at_interval(params['pingInterval'] // 1000)(self.ping) def __enter__(self): return self def __exit__(self, exc_type, exc, tb): self.close() async def close(self): print("Closing websocket") self.should_run = False if self.websocket.open: await self.websocket.close() print("Websocket closed") async def send(self, message): await self.emit('message', message) async def emit(self, event, data=None): print("Emitting: %s%s%s" % (MESSAGE_EVENT, event, data)) await self._send_message(MESSAGE_EVENT, (event, data)) async def run_forever(self): """Main loop for SocketIO.""" print("Entering SocketIO event loop") counter = 0 # send a connection event await self._handle_event('connection') while self.should_run and (self.websocket.open or self.reconnect): if not self.websocket.open: print("Reconnecting in SocketIO event loop") self.websocket = await uws_client.connect(self.uri) try: packet_type, data = await self._recv() await self._handle_packet(packet_type, data) except Exception as exception: print("Exception when handling incoming SocketIO packet:") sys.print_exception(exception) await self.emit("exception", utils.get_exception_string(exception)) counter += 1 counter %= 1000000 for interval, func in self._interval_handlers: if counter % interval == 0: await func() await uasyncio.sleep_ms(50) print("Exiting SocketIO event loop") async def _handle_packet(self, packet_type, data): if packet_type is None: pass # elif packet_type == PACKET_MESSAGE: message_type, data = decode_packet(data) await self._handle_message(message_type, data) elif packet_type == PACKET_CLOSE: print("Socket.io closed") await self.close() elif packet_type == PACKET_PING: print("< ping") try: await self._send_packet(PACKET_PONG, data) except OSError as err: if (err.errno == errno.ENOTCONN or err.errno == errno.ECONNRESET) and self.reconnect: print("Reconnecting when handling ping message") self.websocket = await uws_client.connect(self.uri) await self._send_packet(PACKET_PONG, data) else: raise err print("> pong") elif packet_type == PACKET_PONG: print("< pong") elif packet_type == PACKET_NOOP: pass else: print("Unhandled packet %s: %s" % (packet_type, data)) async def _handle_message(self, message_type, data): if message_type == MESSAGE_EVENT: message = json.loads(data) if len(message) == 2: event = message[0] data = message[1] elif len(message) == 1: event = message[0] data = None else: print("Message length not according to expectation") return # Build the nested if above to fix an issue in which # event, data = json.loads() can only return 1 var whilst expecting 2 #print('some: ' + str(some)) #event, data = json.loads(data) await self._handle_event(event, data) elif message_type == MESSAGE_ERROR: print("Error: %s" % data) elif message_type == MESSAGE_DISCONNECT: print("Disconnected") await self.close() else: print("Unhandled message %s: %s" % (message_type, data)) async def _handle_event(self, event, data=None): print("Handling event '%s'" % event) for handler in self._event_handlers.get(event, []): print("Calling handler %s for event '%s'" % (handler, event)) await handler(data) async def _send_packet(self, packet_type, data=''): if not self.websocket.open: print("Reconnecting websocket before sending packet") self.websocket = await uws_client.connect(self.uri) await self.websocket.send('{}{}'.format(packet_type, data)) async def _send_message(self, message_type, data=None): await self._send_packet(PACKET_MESSAGE, '{}{}'.format(message_type, '' if data is None else json.dumps(data))) async def ping(self): print("> ping") await self._send_packet(PACKET_PING) async def _recv(self): """Receive a packet.""" try: # self.websocket.settimeout(1) packet = await self.websocket.recv() if packet: return decode_packet(packet) else: return None, None except OSError: return None, None finally: pass # self.websocket.settimeout(None) def on(self, event): """Register an event handler with the socket.""" def inner(func): print("Registered %s to handle %s" % (func, event)) self._event_handlers.setdefault(event, []).append(func) return inner def at_interval(self, interval): """Register an event handler to happen at an interval.""" def inner(func): print("Registered %s to run at interval %s" % (func, interval)) self._interval_handlers.append((interval, func)) return inner