Source code for p2p0mq.app.server

# -*- coding: utf-8 -*-
"""
"""
from __future__ import unicode_literals
from __future__ import print_function

import logging

import zmq
from zmq.backend.cython.error import zmq_errno

from p2p0mq.constants import (
    TRACE, RECEIVE_LIMIT_PER_LOOP, MESSAGE_TYPE_REPLY, MESSAGE_TYPE_REQUEST,
    LOOP_CONTINUE, ISOLATE, MESSAGE_TYPE_ROUTE,
    TRACE_PACKETS, TRACE_NET)
from p2p0mq.errors import ValidationError, MessageValidationError
from p2p0mq.message import Message
from p2p0mq.message_queue.slow import SlowMessageQueue
from p2p0mq.utils.thread.netthread import KoNetThread

logger = logging.getLogger('p2p0mq.app.s')


[docs]class Receiver(KoNetThread): """ A thread that receives requests and information from peers. This is a "server" according to the [zmq_curve](http://api.zeromq.org/4-1:zmq-curve) which states that: > A socket using CURVE can be either client or server, > at any moment, but not both. > The role is independent of bind/connect direction. To become a CURVE server, the local peer sets the ZMQ_CURVE_SERVER option on the socket, and then sets the ZMQ_CURVE_SECRETKEY option to provide the socket with its long-term secret key. The local peer does not provide the socket with its long-term public key, which is used only by clients. The receiver listens to messages and creates messages out of them. The messages are then placed in the queue where the local peer will pick them up. For now the receiver understands two kinds of messages: requests and replies. However,adding additional types is easy: add a queue to the typed_queues member and create code on application side to offload this queue. """ def __init__(self, *args, **kwargs): """ Constructor. """ super(Receiver, self).__init__(*args, **kwargs) self.name = 'p2p0mq.S.th' if self.app is None or self.app.uuid is None \ else ('%s-p2p0mq.S.th' % self.app.uuid[-4:].decode("utf-8")) self.typed_queues = { MESSAGE_TYPE_REPLY: SlowMessageQueue(), MESSAGE_TYPE_REQUEST: SlowMessageQueue(), MESSAGE_TYPE_ROUTE: SlowMessageQueue(), }
[docs] def create(self): """ Called at thread start to initialize the state. """ logger.debug("Receiver is being created...") super().create() # The REP socket type acts as as service for a set of client peers, # receiving requests and sending replies back to the requesting # peers. It is designed for simple remote-procedure call models. # https://rfc.zeromq.org/spec:28/REQREP/#the-rep-socket-type new_socket = self.context.socket(zmq.ROUTER) # new_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Do not keep messages in memory that were not send yet when # we attempt to close the socket. # http://api.zeromq.org/2-1:zmq-setsockopt#toc15 new_socket.setsockopt(zmq.LINGER, 0) # The ZMQ_IDENTITY option shall set the identity of # the specified socket. Socket identity determines if # existing ØMQ infrastructure (message queues, # forwarding devices) shall be identified with a # specific application and persist across multiple # runs of the local peer. # # If the socket has no identity, each run of an # application is completely separate from other runs. # However, with identity set the socket shall re-use # any existing ØMQ infrastructure configured by the # previous run(s). Thus the local peer may receive # messages that were sent in the meantime, message # queue limits shall be shared with previous run(s) # and so on. # http://api.zeromq.org/2-1:zmq-setsockopt#toc6 new_socket.setsockopt(zmq.IDENTITY, self.app.uuid) if not self.app.no_encryption: logger.debug( "Application %r loads certificates for its Receiver from %s", self.app.uuid, self.app.private_file) server_public, server_secret = zmq.auth.load_certificate( self.app.private_file) # To become a CURVE server, the local peer sets the ZMQ_CURVE_SERVER # option on the socket, new_socket.curve_server = True # and then sets the ZMQ_CURVE_SECRETKEY option # to provide the socket with its long-term secret key. new_socket.curve_secretkey = server_secret # The local peer does not provide the socket with # its long-term public key, which is used only by clients. # socket.curve_publickey = server_public address = self.address.replace('0.0.0.0', '*') logger.log(TRACE, "Attempting to bind to %r...", address) new_socket.bind(address) logger.debug("Receiver bound to %s", address) self.socket = new_socket logger.debug("Receiver was created") return new_socket
[docs] def terminate(self): """ Called at thread end to free resources. """ super().terminate() assert self.socket is None logger.debug("receiver (server) thread was terminated on app %r", (self.app.uuid if self.app is not None else ''))
[docs] def enqueue(self, message, messages): """ Places a message in the appropriate queue. Messages not destined to us are placed in a special queue. Others are split based on their type field. Messages that have unknown types are silently discarded. """ # Is this message destined to this instance? if message.to != self.app.uuid: queue = messages[MESSAGE_TYPE_ROUTE] else: try: queue = messages[message.kind] except KeyError: logger.error("Received unknown message type: %r", message.kind) return # Update the queue. queue.append(message) logger.log(TRACE, "added to queue %r", queue)
[docs] def update_queues(self, messages): """ Places messages in queues all at once. The messages is a dictionary, with keys being the """ for kind in self.typed_queues: from_queue = messages[kind] if len(from_queue) > 0: self.typed_queues[kind].enqueue(from_queue)
[docs] def receive_message(self): """ Receive a single message. """ message = None # noinspection PyBroadException try: # TODO: as we're converting the data right away do we need copy? raw_data = self.socket.recv_multipart(copy=True) logger.log(TRACE_PACKETS, "<<<<<<<<<<<<<<< received message %r", raw_data) message = Message.parse(raw_data, self.app.uuid) logger.log(TRACE_NET, "converted into message %r", message) except Exception: logger.error("Received invalid message", exc_info=True) return message
[docs] def execute(self): """ Called to execute the main part of the thread. In implementations where this function is executed in a loop it is expected to return False to break the loop end terminate the thread. """ # We store the messages here and we update the queues all at once. messages = dict([(kind, []) for kind in self.typed_queues]) # Process incoming data. for i in range(RECEIVE_LIMIT_PER_LOOP): # Wait a bit for messages. Early exit if not. try: result = self.socket.poll(timeout=100, flags=zmq.POLLIN) except zmq.error.ZMQError: logger.error("Got ZMQ error %r in server poll", zmq_errno(), exc_info=True) break if not (result & zmq.POLLIN): break # Process it. message = self.receive_message() if message is not None: self.enqueue(message, messages) # Place them in queue all at once. self.update_queues(messages) return LOOP_CONTINUE