Concerns

The concerns deal with creation and message handling on both side of the connection.

Each concern has a command identification and, thus, can only handle exactly one command-replay pair. The parameter is set in the constructor and should be kept constant after the instance has been made part of the application via start().

Once the local peer hs been started the main loop will call execute() on the concern on each loop in the context of the app thread. The concern is free to add messages to application queue but it should not send them directly.

The local peer builds maps for concerns so that, when a request or reply arrives the appropriate method will be called in the context of the sender/receiver thread.

Manager

Concerns settings and actions are grouped in a distinct class - ConcernsManager. Current implementation merges the manager into the application class.

The class stores a dictionary of active concerns indexed by their command id (NOT the name). Some build-in concerns are automatically added via add_all_library_concerns() and other concerns should be added using add_concern().

The manager informs the Concern that it is entering active duty by calling is start() method either when the application is starting or, if added later, at add_concern() time.

Initialization of the manager through start_concerns() adds some (start method of each concern in the list will be invoked at this time.

Termination of the manager is equally simple, with terminate() method of each concern being called.

In the context of the local peer thread, on each loop, the manager participates with:

Message processing methods (both requests and replies) take their input from application level queues. On each loop a number of messages (not larger than PROCESS_LIMIT_PER_LOOP) will be de-queued, the corresponding Concern will be located and asked to process_reply() or to process_request(). Either can return a message which the local peer will enqueue (this is just a convenience feature; the concern can enqueue the messages itself and return None).

A Concern

The interface for concerns is defined in Concern. Users of the library will often create subclasses of it to implement new types of messages.

The Concern is informed about events regarding its own life (start() and terminate()) and about the life of the messages it handles ( message_sent(), send_failed(), message_dropped()).

Its main functions are:

  • to generate messages, either from execute() (called on each thread loop) or by using new methods (include compose in the name of the method for consistency);
  • to reply to messages in process_request();
  • to handle message responses in process_reply().

Build-in Concerns

Connector

The purpose of this concern is to monitor the list of peers for new entries and to attempt to establish a connection with them.

For peers in initial state (those we never attempted to connect before) that also have a host set we create a message where we also set our connection parameters.

The INITIAL state of the peer becomes CONNECTING only when we are informed in message_sent() that we were able to send the message. If we get a failure via send_failed() or message_dropped() the state of the peer is set to NO_CONNECTION.

For peers in CONNECTING state (message sent but reply did not arrive), it the timeout has been exceeded, we also set the NO_CONNECTION state.

When we receive a connect request the state is updated for originating peer to either CONNECTED or ROUTED, depending on the path it arrived on. Same thing happens when the reply to a previous request is received. The peer is left in a state appropriate for the heart-beat to pickup.

Heart-beats

The purpose of this concern is to monitor connectivity status for each peer in the list by regularly sending them small messages and measuring the time it takes to get back.

Using the execute() hook the concern monitors the state of the peers which have three relevant members:

  • next_heart_beat_time: at this time (in seconds since Epoch) a heart beat request will be sent if nothing resets the timer until then;
  • last_heart_beat_time: records the last time a message has been seen from this particular peer and is used to change the state of the peerto UNRESPONSIVE (if UNRESPONSIVE_THRESHOLD second have passed) or to NO_CONNECTION (if NO_CONNECTION_THRESHOLD have passed).
  • slow_heart_beat_down: when next_heart_beat_time is reached a message is send and slow_heart_beat_down is increased by HEART_BEAT_SLOW_DOWN seconds, so each time it increases the time between two consecutive heart-beat requests (see schedule_heart_beat()).

Ask-Around

Used for peers that we don’t know. See Routing for details about its workings.

class p2p0mq.concerns.manager.ConcernsManager(*args, **kwargs)[source]

Manages the concerns inside the local peer.

concerns

The list of concerns we know about, indexed by their command_id.

Type:dict
concerns_started

Flag to tell if the start method has been called. This is used to determine who’s responsibility it is to call start() on newly added concerns.

Type:bool
add_all_library_concerns()[source]

Creates instances of some of the concerns defined in this package and adds them to the list.

The concerns added are:

The concerns defined in this package but not included by default are:

add_concern(concern)[source]

Adds a single concern to the list.

Parameters:concern (Concern) – The new concern to add. It is asserted that the command id is not present in the dictionary.
execute_concerns()[source]

Execute concerns.

Called on each execute step by the local peer. Call each concern’s execute method in turn.

process_common(queue, label, reply)[source]

Called on the local peer thread to process requests and replies.

process_replies(queue)[source]

Called on the local peer thread to process replies.

Replies are received by the server (Receiver) and are simply deposited in the queue. This function takes the replies and delivers them to concerned handlers.

process_requests(queue)[source]

Called on the local peer thread to process requests.

Requests are received by the server (Receiver) and are simply deposited in the queue. This function takes the requests and delivers them to concern handlers.

start_concerns()[source]

Called by the local peer code at startup time to install hooks.

Adding concerns after this After this point the list should not be changed.

terminate_concerns()[source]

Called by the local peer code when the local peer ends.

This method should be written defensively, as the environment might not be fully set (an exception in create() does not prevent this method from being executed).

class p2p0mq.concerns.base.Concern(name, command_id, app=None, *args, **kwargs)[source]

Base class for concerns.

command_id

A unique command id used as part of the messages initiated by this concern. Same id is used both for requests and replies.

Type:bytes
name

Human readable name of this concern.

Type:str
app

The manager where this concern is installed.

Type:ConcernsManager
execute()[source]

Called from application thread on each thread loop.

message_dropped(message)[source]

We are informed that one of our messages was dropped.

This call is made in the context of the sending thread when the time-to-live of the message has expired. Unlike send_failed(), this method cannot return a message to be re-queued.

Parameters:message (Message) – The message that was send.
message_sent(message)[source]

We are informed that one of our messages was sent.

This call is made in the context of the sending thread.

Parameters:message (Message) – The message that was send.
process_reply(message)[source]

Handler on the sender side for replies.

Parameters:message (Message) – The message that has been received.
process_request(message)[source]

Handler on the receiver side for requests.

Parameters:message (Message) – The message that has been received.
send_failed(message, exc=None)[source]

We are informed that one of our messages failed to send.

This call is made in the context of the sending thread and only if the time-to-live of the message has not been expired. Otherwise, a call to message_dropped() is made.

By returning the same message the concern essentially implements a retry-until-expires mechanism.

Parameters:
  • message (Message) – The message that failed to send.
  • exc (Exception) – The exception that was raised, if any.
Returns:

the message to be re-queued (can be the same message). This is NOT a (PRIORITY, message) type of reply.

start()[source]

Called by the ConcernsManager to inform the concern that it was installed.

For concerns installed before the local peer has been started this method is called before entering main loop. The sender and the receiver are not started at that time.

terminate()[source]

Called by the ConcernsManager to inform the concern that it was uninstalled.

At this point main loop has been exited and the receiver and the sender have been stopped.

class p2p0mq.concerns.connector.ConnectorConcern(*args, **kwargs)[source]

Connects peers.

We continuously check for peers in initial state or peers that were requested to connect.

For peers in initial state we instruct the sender to connect to them and, once connected, we send them the greetings message.

connect_peer(peer, first=True)[source]

Take steps to connect a peer.

Parameters:
  • peer (Peer) – The peer to connect to.
  • first (bool) – tells if this is a connect or reconnect attempt.
connecting_peer(peer)[source]

Check a peer we attempted to connect.

If the timeout set when the message was created has been exceeded the peer is marked as not connected. A new connection attempt is also set into the future through reconnect_peer().

Parameters:peer (Peer) – The peer in question.
declare_no_connection(peer)[source]

We mark a peer as impossible to connect to.

A reconnect attempt will also be scheduled after some seconds i nto the future. At that point reconnect_peer() will be invoked.

Parameters:peer (Peer) – The peer in question.
execute()[source]

Called from application thread on each thread loop.

The method will look into all peers and decide actions based on their state:

  • for new peers in INITIAL state a message is enqueued and state is changed to CONNECTING;
  • for CONNECTING peers, if the timeout is exceeded the state is changed to NO_CONNECTION;
  • for peers in NO_CONNECTION state a reconnect is attempted if the time is right.

Any peer that doesn’t have a host set is ignored.

message_dropped(message)[source]

We are informed that one of our messages was dropped.

This call is made in the context of the sending thread.

message_sent(message)[source]

We are informed that one of our messages was sent.

This call is made in the context of the sending thread.

process_reply(message)[source]

A request to connect has been accepted by the peer.

This is the handler on the sender side for connect requests. We update the details based on the information in the reply and change state of the peer to either CONNECTED or ROUTED depending on the path the message has arrived on.

Parameters:message (Message) – The message to process.
process_request(message)[source]

A peer requests to connect to local peer.

This is the handler on the receiver side for connect requests. If we know this peer we update its details like host and port. If we don’t, we create a new Peer and add it to the local peer.

A new connection is attempted right away with new details if the state is INITIAL, NO_CONNECTION, or UNREACHABLE. For other states (CONNECTING, CONNECTED, ROUTED) we change state to either CONNECTED or ROUTED depending on the path the message arrived.

A reply is composed with our details (host, port) and is send to the path it came from.

Parameters:message (Message) – The message to process.
reconnect_peer(peer)[source]

Re-attempt to connect a peer that failed before.

Parameters:peer (Peer) – The peer in question.
send_failed(message, exc=None)[source]

We are informed that one of our messages failed to send.

This call is made in the context of the sending thread.

Warning

As the handling of this message is special ( see p2p0mq.app.client.Sender.connect_peers()) this method is prohibited from re-issuing a message by returning it.

class p2p0mq.concerns.heart_beat.HeartBeatConcern(*args, **kwargs)[source]

Manages the heart-beat signal between peers.

compose_heart_beat_request(peer)[source]

Creates a request for a heartbeat.

Parameters:peer (Peer) – The peer we should send the message to.
execute()[source]

Called from application thread on each thread loop.

We go through each CONNECTED, ROUTED or UNREACHABLE peer and send a heart beat message if the timeout has been reached.

expired_peer(peer, messages)[source]

A peer that has passed it’s heart-beat time.

If the time since last message is large enough either the NO_CONNECTION or UNRESPONSIVE state is set.

For peers in NO_CONNECTION state no heart-beat will be scheduled, as it is the responsibility of ConnectorConcern to bring it out of that state.

UNRESPONSIVE peers are only sent heart beat messages until they exit this state.

Parameters:
  • peer (Peer) – The peer whose heart-beat timeout has expired.
  • messages (list) – The list where we append any messages we decide need sending.
process_reply(message)[source]

A reply to our heart beat request has been received.

This is the handler on the sender side for heart beat reply. We reset the heart beat timer for the peer.

Parameters:message (Message) – The message we have received.
process_request(message)[source]

A heart-beat request message has arrived.

This is the handler on the receiver side for heart beat requests. We reset the heart beat timer for the peer that sent the message and we send a reply back.

Parameters:message (Message) – The message we have received.