# -*- coding: utf-8 -*-
"""
sockjs.tornado.session
~~~~~~~~~~~~~~~~~~~~~~
SockJS session implementation.
"""
import logging
from sockjs.tornado import sessioncontainer, periodic, proto
from sockjs.tornado.util import bytes_to_str
LOG = logging.getLogger("tornado.general")
[docs]class ConnectionInfo(object):
"""Connection information object.
Will be passed to the ``on_open`` handler of your connection class.
Has few properties:
`ip`
Caller IP address
`cookies`
Collection of cookies
`arguments`
Collection of the query string arguments
`headers`
Collection of explicitly exposed headers from the request including:
origin, referer, x-forward-for (and associated headers)
`path`
Request uri path
"""
_exposed_headers = set(['referer', 'x-client-ip', 'x-forwarded-for',
'x-cluster-client-ip', 'via', 'x-real-ip'])
def __init__(self, ip, cookies, arguments, headers, path):
self.ip = ip
self.cookies = cookies
self.arguments = arguments
self.headers = {}
self.path = path
for header in headers:
if header.lower() in ConnectionInfo._exposed_headers:
self.headers[header] = headers[header]
def get_argument(self, name):
"""Return single argument by name"""
val = self.arguments.get(name)
if val:
return val[0]
return None
def get_cookie(self, name):
"""Return single cookie by its name"""
return self.cookies.get(name)
def get_header(self, name):
"""Return single header by its name"""
return self.headers.get(name)
# Session states
CONNECTING = 0
OPEN = 1
CLOSING = 2
CLOSED = 3
[docs]class BaseSession(object):
"""Base session implementation class"""
[docs] def __init__(self, conn, server):
"""Base constructor.
`conn`
Connection class
`server`
SockJSRouter instance
"""
self.server = server
self.stats = server.stats
self.send_expects_json = False
self.handler = None
self.state = CONNECTING
self.conn_info = None
self.conn = conn(self)
self.close_reason = None
[docs] def set_handler(self, handler):
"""Set transport handler
``handler``
Handler, should derive from the `sockjs.tornado.transports.base.BaseTransportMixin`.
"""
if self.handler is not None:
raise Exception('Attempted to overwrite BaseSession handler')
self.handler = handler
self.transport_name = self.handler.name
if self.conn_info is None:
self.conn_info = handler.get_conn_info()
self.stats.on_sess_opened(self.transport_name)
return True
[docs] def verify_state(self):
"""Verify if session was not yet opened. If it is, open it and call connections `on_open`"""
if self.state == CONNECTING:
self.state = OPEN
self.conn.on_open(self.conn_info)
[docs] def remove_handler(self, handler):
"""Remove active handler from the session
`handler`
Handler to remove
"""
# Attempt to remove another handler
if self.handler != handler:
raise Exception('Attempted to remove invalid handler')
self.handler = None
[docs] def close(self, code=3000, message='Go away!'):
"""Close session or endpoint connection.
`code`
Closing code
`message`
Close message
"""
if self.state != CLOSED:
try:
self.conn.on_close()
except:
LOG.debug("Failed to call on_close().", exc_info=True)
finally:
self.state = CLOSED
self.close_reason = (code, message)
# Bump stats
self.stats.on_sess_closed(self.transport_name)
# If we have active handler, notify that session was closed
if self.handler is not None:
self.handler.session_closed()
[docs] def delayed_close(self):
"""Delayed close - won't close immediately, but on next ioloop tick."""
self.state = CLOSING
self.server.io_loop.add_callback(self.close)
[docs] def get_close_reason(self):
"""Return last close reason tuple.
For example:
if self.session.is_closed:
code, reason = self.session.get_close_reason()
"""
if self.close_reason:
return self.close_reason
return (3000, 'Go away!')
@property
[docs] def is_closed(self):
"""Check if session was closed."""
return self.state == CLOSED or self.state == CLOSING
[docs] def send_message(self, msg, stats=True, binary=False):
"""Send or queue outgoing message
`msg`
Message to send
`stats`
If set to True, will update statistics after operation completes
"""
raise NotImplemented()
[docs] def send_jsonified(self, msg, stats=True):
"""Send or queue outgoing message which was json-encoded before. Used by the `broadcast`
method.
`msg`
JSON-encoded message to send
`stats`
If set to True, will update statistics after operation completes
"""
raise NotImplemented()
[docs] def broadcast(self, clients, msg):
"""Optimized `broadcast` implementation. Depending on type of the session, will json-encode
message once and will call either `send_message` or `send_jsonifed`.
`clients`
Clients iterable
`msg`
Message to send
"""
self.server.broadcast(clients, msg)
[docs]class Session(BaseSession, sessioncontainer.SessionMixin):
"""SockJS session implementation.
"""
[docs] def __init__(self, conn, server, session_id, expiry=None):
"""Session constructor.
`conn`
Default connection class
`server`
`SockJSRouter` instance
`session_id`
Session id
`expiry`
Session expiry time
"""
# Initialize session
sessioncontainer.SessionMixin.__init__(self, session_id, expiry)
BaseSession.__init__(self, conn, server)
self.send_queue = ''
self.send_expects_json = True
# Heartbeat related stuff
self._heartbeat_timer = None
self._heartbeat_interval = self.server.settings['heartbeat_delay'] * 1000
self._immediate_flush = self.server.settings['immediate_flush']
self._pending_flush = False
self._verify_ip = self.server.settings['verify_ip']
# Session callbacks
[docs] def on_delete(self, forced):
"""Session expiration callback
`forced`
If session item explicitly deleted, forced will be set to True. If
item expired, will be set to False.
"""
# Do not remove connection if it was not forced and there's running connection
if not forced and self.handler is not None and not self.is_closed:
self.promote()
else:
self.close()
# Add session
[docs] def set_handler(self, handler, start_heartbeat=True):
"""Set active handler for the session
`handler`
Associate active Tornado handler with the session
`start_heartbeat`
Should session start heartbeat immediately
"""
# Check if session already has associated handler
if self.handler is not None:
handler.send_pack(proto.disconnect(2010, "Another connection still open"))
return False
if self._verify_ip and self.conn_info is not None:
# If IP address doesn't match - refuse connection
if handler.request.remote_ip != self.conn_info.ip:
LOG.error('Attempted to attach to session %s (%s) from different IP (%s)' % (
self.session_id,
self.conn_info.ip,
handler.request.remote_ip
))
handler.send_pack(proto.disconnect(2010, "Attempted to connect to session from different IP"))
return False
if self.state == CLOSING or self.state == CLOSED:
handler.send_pack(proto.disconnect(*self.get_close_reason()))
return False
# Associate handler and promote session
super(Session, self).set_handler(handler)
self.promote()
if start_heartbeat:
self.start_heartbeat()
return True
[docs] def verify_state(self):
"""Verify if session was not yet opened. If it is, open it and call connections `on_open`"""
# If we're in CONNECTING state - send 'o' message to the client
if self.state == CONNECTING:
self.handler.send_pack(proto.CONNECT)
# Call parent implementation
super(Session, self).verify_state()
[docs] def remove_handler(self, handler):
"""Detach active handler from the session
`handler`
Handler to remove
"""
super(Session, self).remove_handler(handler)
self.promote()
self.stop_heartbeat()
[docs] def send_message(self, msg, stats=True, binary=False):
"""Send or queue outgoing message
`msg`
Message to send
`stats`
If set to True, will update statistics after operation completes
"""
self.send_jsonified(proto.json_encode(bytes_to_str(msg)), stats)
[docs] def send_jsonified(self, msg, stats=True):
"""Send JSON-encoded message
`msg`
JSON encoded string to send
`stats`
If set to True, will update statistics after operation completes
"""
msg = bytes_to_str(msg)
if self._immediate_flush:
if self.handler and self.handler.active and not self.send_queue:
# Send message right away
self.handler.send_pack('a[%s]' % msg)
else:
if self.send_queue:
self.send_queue += ','
self.send_queue += msg
self.flush()
else:
if self.send_queue:
self.send_queue += ','
self.send_queue += msg
if not self._pending_flush:
self.server.io_loop.add_callback(self.flush)
self._pending_flush = True
if stats:
self.stats.on_pack_sent(1)
[docs] def flush(self):
"""Flush message queue if there's an active connection running"""
self._pending_flush = False
if self.handler is None or not self.handler.active or not self.send_queue:
return
self.handler.send_pack('a[%s]' % self.send_queue)
self.send_queue = ''
[docs] def close(self, code=3000, message='Go away!'):
"""Close session.
`code`
Closing code
`message`
Closing message
"""
if self.state != CLOSED:
# Notify handler
if self.handler is not None:
self.handler.send_pack(proto.disconnect(code, message))
super(Session, self).close(code, message)
# Heartbeats
[docs] def start_heartbeat(self):
"""Reset hearbeat timer"""
self.stop_heartbeat()
self._heartbeat_timer = periodic.Callback(self._heartbeat,
self._heartbeat_interval,
self.server.io_loop)
self._heartbeat_timer.start()
[docs] def stop_heartbeat(self):
"""Stop active heartbeat"""
if self._heartbeat_timer is not None:
self._heartbeat_timer.stop()
self._heartbeat_timer = None
[docs] def delay_heartbeat(self):
"""Delay active heartbeat"""
if self._heartbeat_timer is not None:
self._heartbeat_timer.delay()
[docs] def _heartbeat(self):
"""Heartbeat callback"""
if self.handler is not None:
self.handler.send_pack(proto.HEARTBEAT)
else:
self.stop_heartbeat()
[docs] def on_messages(self, msg_list):
"""Handle incoming messages
`msg_list`
Message list to process
"""
self.stats.on_pack_recv(len(msg_list))
for msg in msg_list:
if self.state == OPEN:
self.conn.on_message(msg)