Distributed Communication
=========================
The distributed communication component provides a general message passing mechanism,
which is used to build various communication topologies in the distributed environment.
Besides the essential communication primitive supporting, it also provides the functions of
peer discovering, fault recovering (partially), conditional event auto-dispatching, etc.
Proxy
-----
Providing an implementation of the communication primitives, proxy is the primary entity
of the communication component. Proxy provides a uniformed communication interface,
the underlying driver is pluggable based on the real requirements.
Currently, we use `ZeroMQ `_ as the default choice.
Proxy also provides support for peer discovering based on `Redis `_.
.. image:: ../images/distributed/proxy.svg
:target: ../images/distributed/proxy.svg
:alt: Proxy
Message
^^^^^^^
Message is designed for general purpose,
it is used to package the communication content between components.
The main attributes of a message instance include:
* ``tag``\ : A customized attribute, it can be used to implement the auto-dispatching logic
with a `conditional event register table <#conditional-event-register-table>`_.
* ``source``\ : The alias of the message sender.
* ``destination``\ : The alias of the message receiver.
* ``payload``\ : A Python object for remote function call.
* ``session_id`` (auto-generated): UUID of a specific session,
one session may include multiple messages.
* ``message_id`` (auto-generated): UUID of a specific message.
.. code-block:: python
from maro.communication import Message
message = Message(tag="check_in",
source="worker_001",
destination="master",
payload="")
Session Message
^^^^^^^^^^^^^^^
We provide two kinds of predefined session types for common distributed scenarios:
* **Task Session**\ : It is used to describe a computing task sent from master to worker.
Three stages are included:
#. The master sends the task request(s) to the worker(s);
#. Once the worker(s) receiving the task(s), the worker(s) start to finish the task(s);
#. The worker(s) return the computing result(s) to the master.
* **Notification Session**\ : It is used for information syncing and only includes two stages:
#. The sender sends out the notification message;
#. The receiver(s) receive the notification message.
The stages of each session are maintained internally by the proxy.
.. code-block:: python
from maro.communication import SessionMessage, SessionType
task_message = SessionMessage(tag="sum",
source="master",
destination="worker_001",
payload=[0, 1, 2, ...],
session_type=SessionType.TASK)
notification_message = SessionMessage(tag="check_out",
source="worker_001",
destination="master",
payload="",
session_type=SessionType.NOTIFICATION)
Communication Primitives
^^^^^^^^^^^^^^^^^^^^^^^^
Proxy provides a set of general-purpose communication primitives that support
both blocking and non-blocking cases. These primitives are decoupled from
the underlying implementation of the communication driver (protocol).
The main primitives are listed below:
* ``send``\ : Unicast. It is a blocking, one-to-one sending mode.
It will watch and collect the reply message from the remote peer.
* ``isend``\ : The non-blocking version of the ``send``.
A message session ID will be immediately returned, which can be used by ``receive_by_id``.
* ``scatter``\ : An advanced version of ``send``. Is is used to send message(s) to peer(s)
and watch and collect reply message(s) from the peer(s).
``scatter`` is not a real multi-cast, each message will go through the full TCP/IP
stack (ZeroMQ driver). If the message you want to send is completely same and
you want better performance, use the ``broadcast`` interface instead.
* ``iscatter``\ : The non-blocking version of the ``scatter``. The related messages
session ID(s) will be returned immediately, which can be used by ``receive_by_id``.
* ``broadcast``\ : A blocking function call which is used to broadcast the message
to all subscribers, it will watch and collect all subscribers' reply messages.
* ``ibroadcast``\ : The non-blocking version of the ``broadcast``. The related messages
session IDs will be returned immediately, which can be used by ``receive_by_id``.
* ``receive``\ : It is used to continually receive the message.
* ``receive_by_id``\ : It only receives the message(s) with the given session ID(s).
Conditional Event Register Table
--------------------------------
The conditional event register table provides a message auto-despatching mechanism.
By registering the ``conditional event`` and related ``handler function`` to
the register table, the handler function will be automatically executed
with the received messages when the event conditions are met.
.. image:: ../images/distributed/register_table.register.svg
:target: ../images/distributed/register_table.register.svg
:alt: Register Table
``Conditional event`` is used to declare the required message group for
auto-triggering the related handler function.
The unit event is the minimal component in the conditional event,
it follows a three-stage format: `source`:`tag`:\ ``amount``.
* ``source``\ : It is used to declare the required message source.
The longest-prefix matching is supported.
* ``*`` is used to present any sources.
* ``tag``\ : The ``tag`` attribute of the message instance.
* ``*`` is used to present any tags.
* ``amount``\ : The required message instance amount.
Both a absolute integer and a relative percentage are valid for this field.
* ``%`` is used to represent the relative percentages, such as 60%, 10%, etc.
.. code-block:: python
unit_event_abs = "worker:update:10"
unit_event_rel = "worker:update:60%"
To support more complex business logic,
we provide two operations: ``AND`` and ``OR`` to combine unit events up:
* ``AND``\ : Valid for multiple unit events and combined unit events.
The combined event condition is met if all the conditions of the sub-events are met.
* ``OR``\ : Valid for multiple unit events and combined unit events.
The combined event condition is met if any sub-event meets the condition.
.. code-block:: python
combined_event_and = ("worker_01:update:2",
"worker_02:update:3",
"AND")
combined_event_or = ("worker_03:update:1",
"worker_04:update:5",
"OR")
combined_event_mix = (("worker_01:update:2", "worker_02:update:3", "AND"),
"worker_03:update:1",
"OR")
``Handler function`` is a user-defined callback function that is bind to
a specific conditional event. When the condition of the event is met,
the related messages will be sent to the handler function for its execution.
.. image:: ../images/distributed/register_table.trigger.svg
:target: ../images/distributed/register_table.trigger.svg
:alt: Register Table
.. code-block:: python
# A common handler function signature
def handler(that, proxy, messages):
"""
Conditional event handler function.
Args:
that: local instance reference, which needs to be operated.
proxy: the proxy reference for remote communication.
messages: received messages.
"""
pass
Distributed Decorator
---------------------
Distributed decorator is a helper for generating a distributed worker class
from a local functional class.
.. code-block:: python
from maro.communication import dist, Proxy
# Initialize proxy instance for remote communication.
proxy = Proxy(group_name="master-worker",
component_type="worker",
expected_peers=[("master", 1)])
# Declare the trigger condition of rollout event.
rollout_event = "master:rollout:1"
# Implement rollout event handler logic.
def on_rollout(that, proxy, messages):
pass
# Assemble event-handler directory.
handler_dict = {rollout_event: on_rollout}
# Convert a local functional class to a distributed one.
@dist(proxy, handler_dict)
class Worker:
def __init__(self):
pass