Concerns¶
The concerns deal with creation and message handling on both side of the connection.
Each concern has a command identification and, thus, can only handle
exactly one command-replay pair.
The parameter is set in the constructor and
should be kept constant after the instance has been made part of the
application via start()
.
Once the local peer hs been started the main loop will call
execute()
on the concern on each loop in the context of the app thread.
The concern is free to add messages to application queue but
it should not send them directly.
The local peer builds maps for concerns so that, when a request or reply arrives the appropriate method will be called in the context of the sender/receiver thread.
Manager¶
Concerns settings and actions are grouped in a
distinct class - ConcernsManager
.
Current implementation merges the manager into
the application class.
The class stores a dictionary of active concerns indexed by their
command id (NOT the name). Some build-in concerns are automatically added
via add_all_library_concerns()
and other concerns should be added using
add_concern()
.
The manager informs the Concern
that it is entering active duty by calling is
start()
method either when the
application is starting or, if added later, at
add_concern()
time.
Initialization of the manager through
start_concerns()
adds some (start
method of each concern in the list will be invoked at this time.
Termination of the manager is equally simple, with
terminate()
method of each
concern being called.
In the context of the local peer thread, on each loop, the manager participates with:
execute_concerns()
which calls theexecute()
method of every concern it knows about; the concerns thus have the ability to generate messages spontaneously;process_requests()
for messages directed at local peer andprocess_replies()
for replies to messages initiated by the local peer.
Message processing methods (both requests and replies) take their
input from application level queues. On each loop a number of messages
(not larger than PROCESS_LIMIT_PER_LOOP) will be de-queued,
the corresponding Concern
will be
located and asked to process_reply()
or to process_request()
. Either
can return a message which the local peer will enqueue (this is just
a convenience feature; the concern can enqueue the messages itself
and return None).
A Concern¶
The interface for concerns is defined in Concern
.
Users of the library will often create subclasses of it to implement
new types of messages.
The Concern
is informed about
events regarding its own life (start()
and terminate()
) and about the
life of the messages it handles (
message_sent()
,
send_failed()
,
message_dropped()
).
Its main functions are:
- to generate messages, either from
execute()
(called on each thread loop) or by using new methods (include compose in the name of the method for consistency); - to reply to messages in
process_request()
; - to handle message responses in
process_reply()
.
Build-in Concerns¶
Connector¶
The purpose of this concern is to monitor the list of peers for new entries and to attempt to establish a connection with them.
For peers in initial state (those we never attempted to connect before) that also have a host set we create a message where we also set our connection parameters.
The INITIAL state of the peer becomes CONNECTING only when we are
informed in message_sent()
that we were able to send the message. If we get
a failure via send_failed()
or
message_dropped()
the
state of the peer is set to NO_CONNECTION.
For peers in CONNECTING state (message sent but reply did not arrive), it the timeout has been exceeded, we also set the NO_CONNECTION state.
When we receive a connect request the state is updated for originating peer to either CONNECTED or ROUTED, depending on the path it arrived on. Same thing happens when the reply to a previous request is received. The peer is left in a state appropriate for the heart-beat to pickup.
Heart-beats¶
The purpose of this concern is to monitor connectivity status for each peer in the list by regularly sending them small messages and measuring the time it takes to get back.
Using the execute()
hook the concern monitors the state of the peers which have
three relevant members:
- next_heart_beat_time: at this time (in seconds since Epoch) a heart beat request will be sent if nothing resets the timer until then;
- last_heart_beat_time: records the last time a message has been seen from this particular peer and is used to change the state of the peerto UNRESPONSIVE (if UNRESPONSIVE_THRESHOLD second have passed) or to NO_CONNECTION (if NO_CONNECTION_THRESHOLD have passed).
- slow_heart_beat_down: when next_heart_beat_time is reached a message is send and slow_heart_beat_down is increased by HEART_BEAT_SLOW_DOWN seconds, so each time it increases the time between two consecutive heart-beat requests (see
schedule_heart_beat()
).
Ask-Around¶
Used for peers that we don’t know. See Routing for details about its workings.
-
class
p2p0mq.concerns.manager.
ConcernsManager
(*args, **kwargs)[source]¶ Manages the concerns inside the local peer.
-
concerns_started
¶ Flag to tell if the start method has been called. This is used to determine who’s responsibility it is to call
start()
on newly added concerns.Type: bool
-
add_all_library_concerns
()[source]¶ Creates instances of some of the concerns defined in this package and adds them to the list.
The concerns added are:
The concerns defined in this package but not included by default are:
-
add_concern
(concern)[source]¶ Adds a single concern to the list.
Parameters: concern (Concern) – The new concern to add. It is asserted that the command id is not present in the dictionary.
-
execute_concerns
()[source]¶ Execute concerns.
Called on each execute step by the local peer. Call each concern’s execute method in turn.
-
process_common
(queue, label, reply)[source]¶ Called on the local peer thread to process requests and replies.
-
process_replies
(queue)[source]¶ Called on the local peer thread to process replies.
Replies are received by the server (Receiver) and are simply deposited in the queue. This function takes the replies and delivers them to concerned handlers.
-
process_requests
(queue)[source]¶ Called on the local peer thread to process requests.
Requests are received by the server (Receiver) and are simply deposited in the queue. This function takes the requests and delivers them to concern handlers.
-
-
class
p2p0mq.concerns.base.
Concern
(name, command_id, app=None, *args, **kwargs)[source]¶ Base class for concerns.
-
command_id
¶ A unique command id used as part of the messages initiated by this concern. Same id is used both for requests and replies.
Type: bytes
-
name
¶ Human readable name of this concern.
Type: str
-
app
¶ The manager where this concern is installed.
Type: ConcernsManager
-
message_dropped
(message)[source]¶ We are informed that one of our messages was dropped.
This call is made in the context of the sending thread when the time-to-live of the message has expired. Unlike
send_failed()
, this method cannot return a message to be re-queued.Parameters: message (Message) – The message that was send.
-
message_sent
(message)[source]¶ We are informed that one of our messages was sent.
This call is made in the context of the sending thread.
Parameters: message (Message) – The message that was send.
-
process_reply
(message)[source]¶ Handler on the sender side for replies.
Parameters: message (Message) – The message that has been received.
-
process_request
(message)[source]¶ Handler on the receiver side for requests.
Parameters: message (Message) – The message that has been received.
-
send_failed
(message, exc=None)[source]¶ We are informed that one of our messages failed to send.
This call is made in the context of the sending thread and only if the time-to-live of the message has not been expired. Otherwise, a call to
message_dropped()
is made.By returning the same message the concern essentially implements a retry-until-expires mechanism.
Parameters: - message (Message) – The message that failed to send.
- exc (Exception) – The exception that was raised, if any.
Returns: the message to be re-queued (can be the same message). This is NOT a (PRIORITY, message) type of reply.
-
start
()[source]¶ Called by the
ConcernsManager
to inform the concern that it was installed.For concerns installed before the local peer has been started this method is called before entering main loop. The sender and the receiver are not started at that time.
-
terminate
()[source]¶ Called by the
ConcernsManager
to inform the concern that it was uninstalled.At this point main loop has been exited and the receiver and the sender have been stopped.
-
-
class
p2p0mq.concerns.connector.
ConnectorConcern
(*args, **kwargs)[source]¶ Connects peers.
We continuously check for peers in initial state or peers that were requested to connect.
For peers in initial state we instruct the sender to connect to them and, once connected, we send them the greetings message.
-
connect_peer
(peer, first=True)[source]¶ Take steps to connect a peer.
Parameters: - peer (Peer) – The peer to connect to.
- first (bool) – tells if this is a connect or reconnect attempt.
-
connecting_peer
(peer)[source]¶ Check a peer we attempted to connect.
If the timeout set when the message was created has been exceeded the peer is
marked
as not connected. A new connection attempt is also set into the future throughreconnect_peer()
.Parameters: peer (Peer) – The peer in question.
-
declare_no_connection
(peer)[source]¶ We mark a peer as impossible to connect to.
A reconnect attempt will also be scheduled after some seconds i nto the future. At that point
reconnect_peer()
will be invoked.Parameters: peer (Peer) – The peer in question.
-
execute
()[source]¶ Called from application thread on each thread loop.
The method will look into all peers and decide actions based on their state:
- for new peers in INITIAL state a message is enqueued and state is changed to CONNECTING;
- for CONNECTING peers, if the timeout is exceeded the state is changed to NO_CONNECTION;
- for peers in NO_CONNECTION state a reconnect is attempted if the time is right.
Any peer that doesn’t have a host set is ignored.
-
message_dropped
(message)[source]¶ We are informed that one of our messages was dropped.
This call is made in the context of the sending thread.
-
message_sent
(message)[source]¶ We are informed that one of our messages was sent.
This call is made in the context of the sending thread.
-
process_reply
(message)[source]¶ A request to connect has been accepted by the peer.
This is the handler on the sender side for connect requests. We update the details based on the information in the reply and change state of the peer to either CONNECTED or ROUTED depending on the path the message has arrived on.
Parameters: message (Message) – The message to process.
-
process_request
(message)[source]¶ A peer requests to connect to local peer.
This is the handler on the receiver side for connect requests. If we know this peer we update its details like host and port. If we don’t, we create a new
Peer
and add it to the local peer.A new connection is attempted right away with new details if the state is INITIAL, NO_CONNECTION, or UNREACHABLE. For other states (CONNECTING, CONNECTED, ROUTED) we change state to either CONNECTED or ROUTED depending on the path the message arrived.
A reply is composed with our details (host, port) and is send to the path it came from.
Parameters: message (Message) – The message to process.
-
reconnect_peer
(peer)[source]¶ Re-attempt to connect a peer that failed before.
Parameters: peer (Peer) – The peer in question.
-
send_failed
(message, exc=None)[source]¶ We are informed that one of our messages failed to send.
This call is made in the context of the sending thread.
Warning
As the handling of this message is special ( see
p2p0mq.app.client.Sender.connect_peers()
) this method is prohibited from re-issuing a message by returning it.
-
-
class
p2p0mq.concerns.heart_beat.
HeartBeatConcern
(*args, **kwargs)[source]¶ Manages the heart-beat signal between peers.
-
compose_heart_beat_request
(peer)[source]¶ Creates a request for a heartbeat.
Parameters: peer (Peer) – The peer we should send the message to.
-
execute
()[source]¶ Called from application thread on each thread loop.
We go through each CONNECTED, ROUTED or UNREACHABLE peer and send a heart beat message if the timeout has been reached.
-
expired_peer
(peer, messages)[source]¶ A peer that has passed it’s heart-beat time.
If the time since last message is large enough either the NO_CONNECTION or UNRESPONSIVE state is set.
For peers in NO_CONNECTION state no heart-beat will be scheduled, as it is the responsibility of
ConnectorConcern
to bring it out of that state.UNRESPONSIVE peers are only sent heart beat messages until they exit this state.
Parameters: - peer (Peer) – The peer whose heart-beat timeout has expired.
- messages (list) – The list where we append any messages we decide need sending.
-