Source code for mutaprops.managers

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import urllib.parse
from aiohttp import web, ClientSession, WSMsgType, WSServerHandshakeError,\
    ClientOSError
from .mutaprops import MutaPropError, MutaPropClass, MutaAction, MutaTypes
from collections import OrderedDict
import threading
import sockjs
import json
import logging
import os


[docs]class MutaManagerError(MutaPropError): pass
[docs]class HttpMutaManager(object): """ Manages HTML5 gateway for controlling the MutaObjects. Each MutaObject is made accessible as REST API with websocket (SockJS) downstream channel for notifications about model update. Managers can be chained by specifying a "master manager" upon initialization. Master manager can see and manipulate the slave manager's MutaObjects. One Manager can have only one master, but can be master of many slaves. Chains can be of any length, but in practice the feasibility of long chains will be limited by the HTTP response times etc. """ WEB_ASSETS = os.path.join(os.path.dirname(__file__), r"web_ui/dist/") INDEX_FILE = open(os.path.join(os.path.dirname(__file__), 'web_ui', 'index.html'), 'rb').read() NOTIFICATION_PROPERTY_CHANGE = 'property_change' NOTIFICATION_EXTERNAL_CHANGE = 'external_change' NOTIFICATION_LOG_MESSAGE = 'log' NOTIFICATION_OBJECTS_CHANGE = 'objects_change' NOTIFICATION_TERMINATION = 'terminated' HEADER_SUPERVISOR = "muta-supervisor" EVENT_SOURCE_OBJECT = "object" EVENT_SOURCE_MASTER = "master" EVENT_SOURCE_USER = "user"
[docs] class WsHandler(logging.Handler): """ Handler to forward logging messages over websocket.""" def __init__(self, msg_callback, level=logging.NOTSET): self._msg_callback = msg_callback super().__init__(level)
[docs] def emit(self, record): self._msg_callback(HttpMutaManager.NOTIFICATION_LOG_MESSAGE, **record.__dict__)
def __init__(self, name, loop=None, master=None, local_dir=None, help_doc=None, proxy_log=None, log_level=logging.NOTSET): """ :param name: Name displayed in the UI top menu. Optional: :param loop: Asyncio loop. If not specified, a new one will be created. :param master: ``http://masteraddr:port`` Address of the master controller. :param local_dir: Path to the directory which will be made accessible in the webserver as ``/local`` :param help_doc: String A HTML code to be displayed in the help window. :param proxy_log: A :class:`logging.Logger` instance to be forwarded to the UI. :param log_level: A log level to be displayed at the UI level. """ self._name = name self._loop = loop or asyncio.get_event_loop() self._app = web.Application(loop=self._loop) self._muta_objects = OrderedDict() self._init_router(local_dir=local_dir) self._sockjs_manager = None self._logger = logging.getLogger(HttpMutaManager.__class__.__name__) self._manager_proxies = {} self._proxy_reconnector_task = None self._master_manager = master self._host_addr = None self._host_port = None self._proxy_logger = None self._help_doc = help_doc # Logging if proxy_log is None: # Get root logger self._proxy_logger = logging.getLogger() else: self._proxy_logger = proxy_log self._proxy_logger.addHandler(HttpMutaManager.WsHandler( self._send_notification, log_level)) @asyncio.coroutine def _get_app_name(self, request): return web.Response(text=self._name) @asyncio.coroutine def _get_help_doc(self, request): return web.Response(text=self._help_doc) @asyncio.coroutine def _get_object_list(self, request): # Don't know if it's better to return empty list or 204... # if not self._muta_objects: # return web.HTTPNoContent() # No objects defined # temp = {'objects': [obj.muta_id for obj in self._muta_objects]} # temp = [obj.muta_id for obj in self._muta_objects] temp = list(self._muta_objects.keys()) return web.json_response(temp) def _find_object(self, request): return self._muta_objects[request.match_info['obj_id']] @asyncio.coroutine def _get_object(self, request): try: obj = self._find_object(request) if isinstance(obj, HttpMutaObjectProxy): return (yield from obj.get_object()) else: return web.json_response(obj.to_dict()) except (KeyError, AssertionError): return web.HTTPNotFound() @asyncio.coroutine def _get_props(self, request): try: temp_obj = self._find_object(request) if isinstance(temp_obj, HttpMutaObjectProxy): return (yield from temp_obj.get_props()) else: temp_props = [prop.to_dict(obj=temp_obj) for prop in temp_obj.props.values()] return web.json_response(temp_props) except (KeyError, AssertionError): return web.HTTPNotFound() def _find_prop(self, obj, request): return obj.props[request.match_info['prop_id']] @asyncio.coroutine def _get_prop(self, request): try: temp_obj = self._find_object(request) if isinstance(temp_obj, HttpMutaObjectProxy): return (yield from temp_obj.get_prop( request.match_info['prop_id'])) else: return web.json_response( self._find_prop(temp_obj, request).to_dict(obj=temp_obj)) except (KeyError, AssertionError): return web.HTTPNotFound() @asyncio.coroutine def _get_prop_value(self, request): try: temp_obj = self._find_object(request) if isinstance(temp_obj, HttpMutaObjectProxy): return (yield from temp_obj.get_prop_value( request.match_info['prop_id'])) else: return web.json_response( self._find_prop(temp_obj, request).__get__(temp_obj)) except (KeyError, AssertionError): return web.HTTPNotFound() @asyncio.coroutine def _set_prop_value(self, request): try: temp_obj = self._find_object(request) value = urllib.parse.parse_qs(request.query_string)['value'][0] if isinstance(temp_obj, HttpMutaObjectProxy): return (yield from temp_obj.set_prop_value( request.match_info['prop_id'], value)) else: temp_prop = self._find_prop(temp_obj, request) if not temp_prop.is_writeable(): return web.HTTPMethodNotAllowed( 'value', [], text="Property is read only.") # The setting of property itself value = MutaTypes.typecast(temp_prop.value_type, value) set_result = temp_prop.muta_set(temp_obj, value) # In case of this action being from master manager, # update the UI if self.HEADER_SUPERVISOR == request.headers: self._property_change(temp_obj.muta_id, temp_prop.prop_id, value, self.EVENT_SOURCE_MASTER) else: # The change was done by user on UI, we have to # notify the other clients or master UIs self._property_change(temp_obj.muta_id, temp_prop.prop_id, value, self.EVENT_SOURCE_USER) # TODO: This should translate muta_set validation result to # HTTP Resp. return web.json_response(set_result) except (KeyError, AssertionError): return web.HTTPNotFound() @asyncio.coroutine def _set_prop_action(self, request): # TODO broadcast notification in case a "supervisor header" is present try: temp_obj = self._find_object(request) if isinstance(temp_obj, HttpMutaObjectProxy): return (yield from temp_obj.set_prop_action( request.match_info['prop_id'])) else: temp_prop = self._find_prop(temp_obj, request) if isinstance(temp_prop, MutaAction): temp_prop.muta_call(temp_obj) return web.HTTPOk() else: return web.HTTPMethodNotAllowed( "action", ['value', ], text="Resource is not MutaAction.") except (KeyError, AssertionError): return web.HTTPNotFound() @asyncio.coroutine def _register_remote_manager(self, request): data = yield from request.json() self._logger.debug("Registering remote manager %s" % data) remote_address = data.get('address', None) if remote_address: # Check for a loop (remote address is also own master) if remote_address == self._master_manager: raise MutaManagerError("Remote proxy address cannot be " + "the same as the master manager.") try: # Add manager to the list of proxies temp_man = HttpManagerProxy(remote_address) self._add_manager_proxy(temp_man) # Attach the manager yield from temp_man.attach(self) return web.HTTPOk() except MutaManagerError as e: return web.HTTPError(reason=str(e)) else: return web.HTTPBadRequest() @asyncio.coroutine def _remote_manager_reconnector(self, period=10): """ Periodically try to re-connect to remote managers which got reconnected. :param period: [seconds] Time interval between reconnect attempts. """ while True: yield from asyncio.sleep(10) for addr, proxy in self._manager_proxies.items(): if (not proxy.is_attached) and (not proxy.is_being_removed): self._logger.debug( "Attempting reconnection to remote manager @ %s" % addr) try: yield from proxy.attach(self) except Exception as e: self._logger.debug("Reconnect failed with %s" % e) def _add_manager_proxy(self, proxy): if proxy.address not in self._manager_proxies: self._manager_proxies[proxy.address] = proxy else: raise MutaManagerError("Proxy already exists for that name.") def _is_master_connected(self): for addr, proxy in self._manager_proxies.items(): if proxy.is_attached: return True @asyncio.coroutine def _remove_manager_proxy(self, proxy): temp = self._manager_proxies.pop(proxy.address) if temp: yield from temp.detach() def _sockjs_handler(self, msg, session): """ SockJS handler is now not doing anything because we onlu use SockJS for downstream. :param session: :return: """ if msg.tp == sockjs.MSG_OPEN: self._sockjs_manager = session.manager # session.manager.broadcast("Someone joined.") elif msg.tp == sockjs.MSG_CLOSED: self._sockjs_manager = None # session.manager.broadcast("Someone left.") def _property_change(self, obj_id, prop_id, value, event_source=EVENT_SOURCE_OBJECT): self._send_notification(self.NOTIFICATION_PROPERTY_CHANGE, objId=obj_id, propId=prop_id, value=value, eventSource=event_source) self._logger.debug("Property {0} changed value to {1} on {2}".format( obj_id, prop_id, value)) def _send_notification(self, msg_type, **kwargs): temp = {'type': msg_type, 'params': kwargs} self._send_ws_message(temp) def _send_ws_message(self, msg): if self._sockjs_manager: self._sockjs_manager.broadcast(msg) def _index(self, request): return web.Response(body=self.INDEX_FILE, content_type='text/html') def _init_router(self, local_dir=None): self._app.router.add_get('/', self._index) self._app.router.add_static('/dist', self.WEB_ASSETS, show_index=True) if local_dir: self._app.router.add_static('/local', local_dir, show_index=True) self._app.router.add_get('/api/appname', self._get_app_name) self._app.router.add_get('/api/help', self._get_help_doc) self._app.router.add_get('/api/objects', self._get_object_list) self._app.router.add_get('/api/objects/{obj_id}', self._get_object) self._app.router.add_get('/api/objects/{obj_id}/props', self._get_props) self._app.router.add_get('/api/objects/{obj_id}/props/{prop_id}', self._get_prop) self._app.router.add_get('/api/objects/{obj_id}/props/{prop_id}/value', self._get_prop_value) self._app.router.add_put('/api/objects/{obj_id}/props/{prop_id}', self._set_prop_value) # http://programmers.stackexchange.com/questions/141410/restful-state-changing-actions self._app.router.add_put('/api/objects/{obj_id}/props/{prop_id}/action', self._set_prop_action) self._app.router.add_post('/api/remote', self._register_remote_manager) sockjs.add_endpoint(self._app, self._sockjs_handler, name='notifier', prefix='/api/notifications/')
[docs] def add_object(self, muta_object, obj_id=None): """ Add decorated object to the UI manager. :param muta_object: An instance of a class decorated with :func:`~mutaprops.decorators.mutaprop_class`. Optional: :param obj_id: 'Id to be used for the added `muta_object`. """ self._logger.debug("Adding object") # Somewhat unnecessarily complicated checking if not (isinstance(muta_object, MutaPropClass) or isinstance(muta_object, HttpMutaObjectProxy)): raise MutaPropError("Object is not MutaClass instance.") if not muta_object.is_muta_ready() and obj_id is None: raise MutaPropError("MutaObject is not initialized. " + "Provide obj_id or initialize externally.") if muta_object.is_muta_ready(): if obj_id: muta_object.muta_init(obj_id, self._property_change) else: muta_object.muta_init(muta_object.muta_id, self._property_change) else: muta_object.muta_init(obj_id, self._property_change) # Check that we won't have two objects with the same id if muta_object.muta_id in self._muta_objects: raise MutaPropError( "MutaObject with id {0} is already registered.".format( muta_object.muta_id)) self._muta_objects[muta_object.muta_id] = muta_object self._send_notification(self.NOTIFICATION_OBJECTS_CHANGE, objId=muta_object.muta_id, action='added') self._logger.debug("Added object %s" % muta_object.muta_id)
[docs] def remove_object(self, muta_object): """ Remove object from the UI manager. (Causes object to disappear from the UI). :param muta_object: Object to be removed. """ try: temp = self._muta_objects.pop(muta_object.muta_id) temp.muta_unregister() self._send_notification(self.NOTIFICATION_OBJECTS_CHANGE, objId=temp.muta_id, action='removed') self._logger.debug("Removed object %s" % temp.muta_id) except KeyError: raise MutaManagerError("Object is not managed.")
@asyncio.coroutine def _on_shutdown(self, app): self._logger.debug("On Shutdown got called...") # Broadcast termination self._send_notification(self.NOTIFICATION_TERMINATION) # Close all proxies for addr, proxy in self._manager_proxies.items(): self._remove_manager_proxy(proxy) # Stop the reconnector yield from self._proxy_reconnector_task.cancel() # Remove all objects objects_to_remove = list(self._muta_objects.values()) for obj in objects_to_remove: self.remove_object(obj)
[docs] @asyncio.coroutine def register_on_master(self, master_addr): # I know that it's not good to re-load session for single request, # but in this case it's so infrequent it doesn't matter temp_session = ClientSession(loop=self._app.loop) resp = yield from temp_session.post(master_addr + '/api/remote', data=json.dumps({'address': "http://{0}:{1}".format(self._host_addr, self._host_port)})) if resp.status != 200: raise MutaManagerError("Couldn't register to the master") yield from temp_session.close()
def _run(self, host='0.0.0.0', port='8080'): # http://aiohttp.readthedocs.io/en/stable/_modules/aiohttp/web.html?highlight=run_app self._app.on_shutdown.append(self._on_shutdown) handler = self._app.make_handler() # Task for proxy reconnector self._proxy_reconnector_task = asyncio.async( self._remote_manager_reconnector(), loop=self._app.loop) # Now run it all self._host_addr = host self._host_port = port if self._master_manager: asyncio.async(self.register_on_master(self._master_manager), loop=self._app.loop) print("Server starting at http://{0}:{1}".format(host, port)) self._app.loop.run_until_complete(asyncio.gather( self._app.loop.create_server(handler, host, port), self._proxy_reconnector_task)) # self._app.loop.run_forever()
[docs] def run(self, **aiohttp_kwargs): """ Run the manager. :param aiohttp_kwargs: HTTP server parameters as defined for aiohttp `web.run_app <http://aiohttp.readthedocs.io/en/stable/web_reference.html#aiohttp.web.run_app>`_ """ # loop.run_forever() # web.run_app(self._app, **aiohttp_kwargs) self._run(**aiohttp_kwargs)
[docs] def run_in_thread(self, **aiohttp_kwargs): """ Run the UI manager in a separate thread. Theoretically this allows to run the UI for code which is otherwise incompatible with Asyncio. In practice, this is a minefield and it was never properly tested. :param aiohttp_kwargs: HTTP server parameters as defined for aiohttp `web.run_app <http://aiohttp.readthedocs.io/en/stable/web_reference.html#aiohttp.web.run_app>`_ """ t = threading.Thread(target=self._run, kwargs=aiohttp_kwargs) t.start()
[docs] def shutdown(self): self._logger.debug("Shutting down the HttpManager...") self._app.shutdown()
[docs]class HttpManagerProxy: """ Utility class representing a remote (slave) HTTP manager. """ def __init__(self, address): self._address = address self._session = None self._logger = logging.getLogger(self.__class__.__name__) self._registered_objects = {} self._host_manager = None self._is_attached = False self._ws = None self._ws_man = None self._is_being_removed = False @property def session(self): return self._session @property def is_attached(self): return self._is_attached @property def address(self): return self._address @property def is_being_removed(self): """ Host manager checks the flag to know if it shall try to reconnect with the remote manager. """ return self._is_being_removed def _add_remote_object(self, obj): obj_proxy = HttpMutaObjectProxy(self, obj) try: self._host_manager.add_object(obj_proxy) except MutaPropError: # The object may already be there sometimes pass self._registered_objects[obj_proxy.muta_id] = obj_proxy def _remove_remote_object(self, obj): obj_proxy = self._registered_objects.pop(obj, None) if obj_proxy: self._host_manager.remove_object(obj_proxy)
[docs] @asyncio.coroutine def attach(self, host_manager): """ Attaches itself to the host manager, by making it's own remote MutaObjects part of the host managers object list. Also manages WebSocket connection to the remote manager and relays the messagest to the host manager. :param host_manager: A host HttpMutaManager object. :return: """ self._host_manager = host_manager self._session = ClientSession(loop=self._host_manager._app.loop) # Open the WebSocket try: addr = self._address + '/api/notifications/websocket' self._ws = yield from self._session.ws_connect(addr) self._logger.debug("Opened websocket at %s" % addr) except WSServerHandshakeError: raise MutaManagerError("Cannot establish WS connection to %s" % self._address) # Start the WS manager self._ws_man = asyncio.async(self.ws_manager(), loop=self._session.loop) # Get and process the remote objects resp = yield from self._session.get(self._address + '/api/objects') if resp.status != 200: raise MutaManagerError("Cannot access remote objects at %s" % self._address) objects = (yield from resp.json()) or [] for obj in objects: self._add_remote_object(obj) self._is_attached = True self._logger.debug("Remote manager %s attached." % self._address)
[docs] @asyncio.coroutine def detach(self): if self.is_attached: self._logger.debug("Detaching remote manager %s" % self._address) for id, obj in self._registered_objects.items(): self._host_manager.remove_object(obj) # Just to be sure self._registered_objects = {} # Disconnect the WS manager self._ws_man.cancel() # Close the websocket and session yield from self._ws.close() yield from self._session.close() # Tell host manager that it's detached # self._host_manager._on_proxy_manager_detach(self) self._is_attached = False
@asyncio.coroutine def _disconnected(self): yield from self.detach()
[docs] @asyncio.coroutine def ws_manager(self): """ Coroutine implementing the Websocket communication task. """ self._logger.debug("Starting WS manager for remote") while True: try: msg = yield from self._ws.receive() self._logger.debug("Received ws msg %s" % str(msg)) if msg.type == WSMsgType.TEXT: data = json.loads(msg.data) cmd = data.get('type') if cmd == HttpMutaManager.NOTIFICATION_TERMINATION: self._is_being_removed = True yield from self._host_manager._remove_manager_proxy(self) elif cmd == HttpMutaManager.NOTIFICATION_OBJECTS_CHANGE: params = data.get('params', {}) action = params.get('action') if action == 'added': self._add_remote_object(params['objId']) elif action == 'removed': self._remove_remote_object(params['objId']) self._logger.debug("Relaying wsmessage: %s" % str(data)) self._host_manager._send_ws_message(data) elif msg.type == WSMsgType.CLOSED: self._logger.debug("Websocket closed: %s" % msg.data) yield from self._disconnected() break elif msg.type == WSMsgType.ERROR: self._logger.debug("Websocket error: %s" % msg.data) yield from self._disconnected() break except Exception as e: self._logger.exception("WS manager error") break self._logger.debug("WS manager finished.")
[docs]class HttpMutaObjectProxy: """ Utility class proxying remote MutaObjects through REST calls. """ def __init__(self, manager_proxy, obj_id): self._manager_proxy = manager_proxy self._address = "{0}/api/objects/{1}".format( self._manager_proxy.address, obj_id) self._obj_id = obj_id self._session = self._manager_proxy.session # For MutaClass type object compatibility
[docs] def is_muta_ready(self): return True
[docs] def muta_unregister(self): pass
[docs] def muta_init(self, object_id, change_callback=None): pass
@property def muta_id(self): return self._obj_id @asyncio.coroutine def _get_resource(self, resource_address): try: if not self._manager_proxy.is_attached: raise MutaManagerError("Remote manager is not attached.") resp = yield from self._session.get(self._address + resource_address) temp = yield from resp.text() return web.json_response(text=temp) except (ClientOSError, MutaManagerError) as e: return web.HTTPNotFound(text=str(e)) @asyncio.coroutine def _put_resource(self, resource_address): try: if not self._manager_proxy.is_attached: raise MutaManagerError("Remote manager is not attached.") # Add header saying it's from supervisor manager resp = yield from self._session.put(self._address + resource_address, headers={HttpMutaManager.HEADER_SUPERVISOR: 'true'}) text = yield from resp.text() return web.json_response(status=resp.status, text=text) except (ClientOSError, MutaManagerError) as e: return web.HTTPNotFound(text=str(e))
[docs] @asyncio.coroutine def get_object(self): return (yield from self._get_resource(''))
[docs] @asyncio.coroutine def get_props(self): return (yield from self._get_resource('/props'))
[docs] @asyncio.coroutine def get_prop(self, prop_id): return (yield from self._get_resource('/props/{0}'.format(prop_id)))
[docs] @asyncio.coroutine def get_prop_value(self, prop_id): return (yield from self._get_resource('/props/{0}/value' .format(prop_id)))
[docs] @asyncio.coroutine def set_prop_value(self, prop_id, value): return (yield from self._put_resource('/props/{0}?value={1}' .format(prop_id, value)))
[docs] @asyncio.coroutine def set_prop_action(self, prop_id): return (yield from self._put_resource('/props/{0}/action' .format(prop_id)))