Source code for p2p0mq.message

# -*- coding: utf-8 -*-
"""
"""
from __future__ import unicode_literals
from __future__ import print_function

import logging
from time import time

from umsgpack import packb, unpackb

from p2p0mq.constants import MESSAGE_TYPE_REPLY, MESSAGE_TYPE_REQUEST, DEFAULT_TIME_TO_LIVE
from p2p0mq.errors import MessageValidationError

logger = logging.getLogger('p2p0mq.message')

next_message_id = 0


def get_next_message_id():
    global next_message_id
    next_message_id = next_message_id + 1
    return next_message_id


[docs]class Message(object): """ A message we send down the wire. """ def __init__(self, source=None, to=None, previous_hop=None, next_hop=None, command=None, reply=False, handler=None, message_id=None, time_to_live=DEFAULT_TIME_TO_LIVE, **kwargs): """ Constructor. """ super(Message, self).__init__() self.source = source self.to = to self.previous_hop = previous_hop self.next_hop = next_hop self.command = command self.handler = handler if isinstance(reply, bool): self.kind = MESSAGE_TYPE_REPLY if reply else MESSAGE_TYPE_REQUEST else: self.kind = reply self.payload = dict(kwargs) self.time_to_live = time() + time_to_live self.message_id = message_id if message_id else get_next_message_id() def __str__(self): return 'Message(to=%r, src=%r, cmd=%r, message_id=%r)' % ( self.to, self.source, self.command, self.message_id) def __repr__(self): return \ 'Message(' \ 'source=%r, to=%r, ph=%r, nh=%r, command=%r, ' \ 'kind=%r, payload=%r, message_id=%r, ttl=%r)' % ( self.source, self.to, self.previous_hop, self.next_hop, self.command, self.kind, self.payload, self.message_id, self.time_to_live )
[docs] def create_reply(self, source=None, to=None, previous_hop=None, next_hop=None, command=None, reply=True, handler=None, message_id=None, time_to_live=DEFAULT_TIME_TO_LIVE, **kwargs): """ Creates a reply to the sender of this message. """ result = Message( source=source if source is not None else self.to, to=to if to is not None else self.source, previous_hop=previous_hop if previous_hop is not None else self.next_hop, next_hop=next_hop if next_hop is not None else self.previous_hop, command=command if command is not None else self.command, reply=reply, handler=handler if handler is not None else self.handler, message_id=message_id if message_id else self.message_id, **kwargs ) result.time_to_live = time() + time_to_live return result
[docs] def encode(self, app_uuid): """ Converts the message into a string of bytes suitable for transfer. """ assert self.to is not None assert self.command is not None if self.next_hop is None: self.next_hop = self.to if self.source is None: self.source = app_uuid # TODO: BUG self.to if self.to != self.to else b'' return \ self.next_hop, \ self.source if self.source != app_uuid else b'', \ self.to if self.to != self.next_hop else b'', \ bytes([self.kind]), \ self.command, \ packb(self.message_id), \ packb(self.payload)
@staticmethod def parse(raw_data, app_uuid): if len(raw_data) != 7: logger.error("Received malformed message (%d parts)", len(raw_data)) logger.debug("Offending message was: %r", raw_data) return None message = Message( next_hop=None, previous_hop=raw_data[0], source=raw_data[1] if len(raw_data[1]) != 0 else raw_data[0], to=raw_data[2] if len(raw_data[2]) != 0 else app_uuid, reply=raw_data[3][0], command=raw_data[4], message_id=unpackb(raw_data[5]), ) message.payload = unpackb(raw_data[6]) return message
[docs] def valid_for_send(self, app, verbose=True): """ Makes sure that this message has required fields for sending it by the sender. """ if verbose: if self.to is None: logger.error("`to` field is not filled in") return False # if self.next_hop is None: # logger.error("`next_hop` field is not filled in") # return False if self.source is None: logger.error("`source` field is not filled in") return False if self.command is None: logger.error("`command` field is not filled in") return False if self.handler is None: logger.error("`handler` field is not filled in") return False if self.kind is None: logger.error("`kind` field is not filled in") return False if self.message_id is None: logger.error("`message_id` field is not filled in") return False if self.time_to_live is None: logger.error("`time_to_live` field is not filled in") return False if self.time_to_live < app.tick: logger.error("`time_to_live` is %r, but should be in the " "future relative to `app.tick` = %r", self.time_to_live, app.tick) return False return True else: return ( (self.to is not None) and # (self.next_hop is not None) and (self.source is not None) and (self.command is not None) and (self.handler is not None) and (self.kind is not None) and (self.message_id is not None) and (self.time_to_live is not None) and (self.time_to_live >= app.tick) )
[docs] @staticmethod def validate_messages_for_send(message, app): """ Makes sure that one or more messages have required fields for sending them by the sender. """ if isinstance(message, (list, tuple, set)): result = True for m_one in message: result = result and m_one.valid_for_send(app) else: result = message.valid_for_send(app) return result