5. Distributed Programming (netpycos)

As described in Asynchronous Concurrenct Programming (pycos), when a task is created an Pycos (singleton) instance is created if one is not already created. Pycos starts a scheduler that manages tasks created in that program. Pycos in pycos does not start network services for distributed programming. However, when netpycos is used instead of pycos, network services are started so that tasks can send messages to / receive messages from tasks and use other network services in remote pycos. See also Distributed Communicating Tasks (dispycos) which supports distributed and parallel computing, so the clients can send computations (Python functions, files, modules etc.) to remote servers for executing tasks there.

Network services, such as message passing, work transparently so that once references are obtained, they can be used just as with concurrent programming.

5.1. Examples

See Distributed Programming in tutorial for examples. There are many illustrative use cases in ‘examples’ directory under where pycos module is installed.

Following is a brief description of the examples included relevant to this section:

5.2. Location

Some entities in pycos, such as tasks, channels, are capable of providing services for remote peers. These entities should register themselves with names so that remote peers can use those names to get references. These references can then be used to invoke methods described in this section.

Such entities have location, which is network IP address and TCP port, associated to them that indicates where their origin is so that if a method is invoked on a reference at a remote peer, that peer knows where to direct that method. While these entities are being located (to get the reference) at remote peers, if only the name they are registered with is used, then all the known pycos’s are queried to lookup with the given name. If the name is unique across all the pycos’s, the reference would be for that specific entity. However, if the same name is used in more than one pycos, then the reference obtained may not be for the desired entity. In such case, the location of the pycos where the entity is registered can be given. The locations can be obtained by looking up for names of pycos with scheduler.locate(). If the host and TCP port are already known, then the locations can be created with:

class netpycos.Location(host, tcp_port)

host is either host name or IP address and tcp_port is TCP port where pycos is running. This information is printed by the scheduler when it starts.

5.3. Pycos

As explained in Asynchronous Concurrenct Programming (pycos), Pycos scheduler is started automatically (when a task is started, for example) with default initialization. If the scheduler should be initialized with different parameters, it should be initialized before creating any tasks, channels etc. with:

class netpycos.Pycos(udp_port=9705, tcp_port=None, node=None, ext_ip_addr=None, socket_family=None, ipv4_udp_multicast=False, name=None, discover\_peers=True, secret='', certfile=None, keyfile=None, notifier=None, dest_path_prefix=None, max_file_size=None)

node must be either host name or IP address (in either IPv4 or IPv6 format) or list of host names or IP addresses. If it is a list, then pycos starts network services on each address in the list.

udp_port is port number where pycos listens for broadcast messages to find other peers. Default value for udp_port is set to 9705.

tcp_port is port number used for TCP services. Its default value is None, in which case, port 9705 is used.

By default, any available port is used for tcp_port where pycos exchange messages. However, this port can be set to a specific number, which can be useful when dealing with firewall/port forwarding.

ipv4_udp_multicast can be used to enable multicast (instead of broadcast) of IPv4 UDP to discover peers. Default value of False disables multicast, and instead uses broadcast. This option must be same for all peers (i.e., a peer that uses multicast will only detect other peers that use multicast, but not those that use broadcast). With IPv6 only multicast is used.

name, if given, must be a string that is unique for each instance of pycos. If this option is not given, it is set to string representing location where network services are running. Using name, the location where a specific pycos instance is running can be found with pycos’s locate(name)() method. This location can then be used to find and use services, such as sending messages to tasks and channels running at that location.

If discover_peers is True (default), pycos scheduler broadcasts message to detect peers running on local network. If this option is False, that message is not broadcast, so this pycos will not detect peers and peers will not detect this pycos. Message passing with remote tasks, channels etc. can only be established with them only on known peers.

secret is a string that must be set to same for all pycos that communicate. It is used to compute a hash that is used and checked in all communication, so that only pycoss that use same secret can be peers. Note that the messages are not encrypted; if security of messages is needed, SSL options certfile and/or keyfile can be used.

certfile and keyfile are as per SSL module. See SSL (Security / Encryption) for more information.

ext_ip_addr must be IP address (in either IPv4 or IPv6 format) of gateway/firewall when pycos is used behind that gateway/firewall and other peers are outside that gateway/firewall. With this option, peers can communicate with pycos running on a local network and other peers are on remote network. If node is a list, then ext_ip_addr can also be a list and the order of each element ext_ip_addr must correspond to corresponding element in node list, except that ext_ip_addr can be shorter list (than node list), in which case elements in node list that don’t corresponding ext_ip_addr are not configured to use external IP address, i.e,. are not masqueraded.

dest_path is path where files from remote peers are saved, when send_file() method is called (see below). If dest_path is not set, pycos will initialize it to tempfile.gettempdir(). If send_file sets dir to a relative path, the file will be saved at dest_path + dir.

max_file_size is maximum number of bytes a transferred file can be. If it is set to None or 0, there is no limit; if it is set to a number, then any file bigger than that limit is is rejected when send_file is called.

Note that Python programs can’t use multiple cores on an SMP node, other than by using multiprocessing. Instead of using multiprocessing in the case of SMP, multiple programs, each using pycos, can be started on a machine with multiple cores (as many times as needed), all on same udp_port, but different tcp_port per instance; see ‘dispycos’ module in which multiple instances are started on a node with multiple processors. If tcp_port is set explicitly, each instance must use unique tcp_port (but same udp_port). if tcp_port is not set explicitly, pycos chooses different tcp_port for each instance. Within each such program, tasks can use Asynchronous Concurrenct Programming (pycos) and API below to exchange messages among tasks in all programs - although all programs are running on same machine in the case of SMP, tasks running in one program are distributed/remote for tasks running in another program.

When netpycos is used instead of pycos, following additional methods are available in Pycos:

scheduler.locate(name, timeout=None)

Note

This method must be used with yield. While locate methods of tasks, channels etc., are static methods (to be invoked with the class) this method should be invoked on the scheduler instance (which can be obtained, for example, with scheduler = Pycos()).

Finds and returns location where peer Pycos with name is running. If no Pycos with name is currently running, then it will wait until a peer with name starts. This location can be used in other methods. The timeout is the maximum number (or fractions) of seconds for finding location. If timeout seconds elapse (if no peer with name is running or reachable), None is returned.

scheduler.peer(loc, udp_port=9705, stream_send=False, relay=False)

Note

This method must be used with yield.

Contacts pycos at given loc to establish communication. If all pycoss are running on a local network, they can find each other automatically through UDP broadcasting. However, pycoss running at remote network(s) need to be explicitly added with peer method. It is not needed to run this method on both sides - if one pycos successfully runs this method, they will find each other. By default all pycos instances running on a single node (can) use same UDP port, so peer method can be called once per node, even when more than one pycos instance is running on that node.

loc can be an instance of Location, host name or IP address. If it is host name or

IP address, it is treated as a Location with port set to 0.

If stream_send is False, pycos’s scheduler may send one message per connection; i.e., the task that transfers messages opens a connection to the peer, sends a message and the connection is closed, unless there are more messages queued for the same peer (in which case, same connection is used to send the message). As creating sockets and making connections is expensive, this may be inefficient. If stream_send is True, then the connection to given peer is not closed after sending all pending messages - the connection is kept open and used to send messages when they are queued. This can significantly improve performance in some cases. However, keeping connections alive takes system resources, so this option should be used when delays in message transfer are to be minimized at the cost of keeping sockets open to peers. If stream_send is True and port of loc is 0, then all all pycos’s (if more than one pycos is running on that node) running on the node will be streamed; if port is not 0, then messages to only pycos on that TCP port will be streamed.

If relay is True, then the information about source (pycos with which peer method is invoked) is relayed on the network of loc. If source and target are on same network, as mentioned above, they discover each other. If they are on different networks, loc can be used to communicate with each peer by calling peer method for each peer. However, if there are many peers on remote network, relay can be set to True with one peer on the remote network and all available peers on that network will be discovered.

stream_send can be used to enable / disable streaming as and when necessary by calling this method (with stream_send=True to enable streaming or with stream_send=False to disable streaming), even if peer is already discovered..

scheduler.peers()

Is a regular function (not a generator method to create task) that returns currently known peers as list. Each peer is an instance of Location.

scheduler.peer_status(task)

Registers task to receive notifications of peer status. Whenever pycos scheduler discovers a new peer or a peer disconnects, it sends a PeerStatus instance message to task. PeerStatus has 3 attributes: location which is an instance of Location, name is name of peer and status which is either PeerStatus.Online if peer is discovered and PeerStatus.Offline if peer disconnected.

If this method is called with None (instead of a task), peer status messages are not sent any more (to any previously registered task).

scheduler.send_file(location, file, dir=None, overwrite=False, timeout=None)

Note

This method must be used with yield.

Sends (transfers) file file to peer at location. If dir is not None, it must be a relative path, in which case, the file is saved under peer’s dest_path + dir. This method returns 0 if the file was successfully saved at peer, 1 if the file exists at peer with same size, timestamp and permissions (so there is no need to transfer), os.stat structure if file exists at destination with different size/timestamp/permissions but overwrite is False, and -1 in case of any error. If the return value is 0, the sender may delete the file with del_file() later.

scheduler.del_file(location, file, dir=None, timeout=None)

Note

This method must be used with yield.

Deletes file file from peer at location. file and dir must be same as used to send the file (with send_file()). This method returns 0 if the file was successfully deleted and -1 in case of any error.

5.4. Distributed Tasks

When netpycos module is used instead of pycos module, additional methods described below are added to Task class to register them with names so they can be located by remote tasks, used for message passing to/from remote tasks, monitoring remote tasks etc.

netpycos.register(name=None)

A task that can be used for remote services must first register itself with register(name=None). If name is None, then name of the generator function of task is used for registering. The name must be unique at that pycos; registering a different task with the same name will fail. If same name is used for registering in different pycos’s, then locate() method can be given the location of specific pycos to get a reference to task with that name at that location.

Task.locate(name, location=None, timeout=None)

Note

This method must be used with yield.

This static method finds and returns reference to task registered with name. If location is given (e.g., obtained by pycos’s locate), only the peer running at that location is queried for task with that name; otherwise, all known peers are queried. If task is successfully located, the return value is an instance of Task. A remote task reference, rtask can be used for message passing with rtask.send() and rtask.deliver() methods to it, and monitor() method can be used to receive exit status notifications when it is finished, or can be terminated with rocro.terminate().

If a task running at location loc1 registers itself with task.register("server_task"), then tasks running at other peer(s) can obtain references to it with rtask = yield Task.locate("server_task", loc1). If “server_task” is unique among all pycos’s, then the same reference can be obtained with rtask = yield Task.locate("server_task"). Following methods can then be used with the remote task reference rtask:

rtask.send(msg)

Sends message msg to the task rtask (running at “loc1”). That remote task will receive the messages with msg = yield task.receive(). Messages are sent asynchronously - they are queued to be sent and a daemon task sends them in the order submitted. Thus, the caller of rtask.send() cannot determine if/when a message has been sent - in case of network failures, the message may be dropped. rtask.deliver() may be used instead to guarantee message was sent or not. Alternately, the recipient can send acknowledgment back.

rtask.deliver(msg, timeout=None)

Note

This method must be used with yield as recvd = yield rtask.deliver(msg).

Sends message msg to the recipient, waits for acknowledgement from remote pycos that the message has been put in recipient’s message queue and returns that status. If the status is 1, the message has been successfully delivered (when recipient calls receive(), it gets the queued messages in the order they are received). Otherwise (e.g., timeout occurs before delivery, remote task reference rtask is no longer valid), the status returned is 0.

task.monitor(rtask)

Note

This method must be used with yield as status = yield task.monitor(rtask).

Sets task (the task that invoked this method) as a monitor of remote task rtask. If successful, the value yielded is 0. Then when rtask terminates (either normally or abnormally), the exit status is sent to the monitor task.

task.notify(rtask)

Sets rtask as a monitor of task task (the task that invoked this method). If successful, the value yielded is 0. Then when task terminates (either normally or abnormally), the exit status is sent to the monitor rtask.

rtask.terminate()

Terminates the task.

5.5. Distributed Channels

Similar to tasks, channels can be registered and such channels can be used in remote tasks for message passing. A channel, say, chan, can be registered with

chan.register(name=None)

so that tasks at peers can obtain a reference to it. If name is None, name of the channel created with is used for registering. The name must be unique, at least at the pycos where it is registered. If there are duplicate names at different pycoss, finding a channel with Channel.locate() should be used with specific location (peer).

chan.unregister()

A registered channel can be unregistered with chan.unregister(name=None); the name must be same as the one used to register.

Channel.locate(name, location=None, timeout=None)

Note

This method must be used with yield as chan = yield Channel.locate("achannel").

Finds and returns reference to channel registered with name. If location is given (e.g., obtained by pycos’s locate), only the peer running at that location is queried; otherwise, all peers are queried. If channel is successfully located, the return value is an instance of Channel. A remote channel reference can be used for message passing with send() and deliver() methods to it.

If a channel, chan, is created and registered at location loc1 with chan.register("server_channel"), then tasks running at other peer(s) can obtain references to it with rchan = yield Channel.locate("server_channel", loc1). If “server_channel” is unique channel among all the peers, then same can be obtained without giving loc1: rchan = yield Channel.locate("server_channel"). Following methods can then be used with the remote channel reference rchan:

rchan.send(msg)

Sends message msg to the channel (“server_channel” at “loc1”), so that current subscribers to that channel will receive that message. send() sends messages asynchronously - messages are queued to be sent and a daemon task sends them in the order submitted. Thus, the caller of send() cannot determine if/when a message has been sent - in case of network failures, the message may fail to reach the recipient. deliver() may be used instead to guarantee message was sent or not.

rchan.deliver(msg, timeout=None, n=0)

Note

This method must be used with yield as recvd = yield rchan.deliver(msg).

Similar to send(), except that this method must be used with yield and it returns status of delivering the message to the channel. The method will wait until there are at least n subscribers to the channel until timeout seconds. Then it delivers the message to each of the subscribers. The value returned is the number of total recipients of the message. This can be less than n in case of network failures, timeouts, or some subscribers leaving while message is being delivered, or more than n if there are more than n subscribers. If n is 0, then the message will be delivered to all current subscribers. In any case, timeout is maximum amount of time in seconds (or fraction of second) for delivering the message.

rchan.subscribe(subscriber, timeout=None)

Note

This method must be used with yield as n = yield rchan.subscribe(task).

Subscribes subscriber to rchan so that messages sent to rchan will be sent to subscriber. If subscriber is a task, then messages can be received with msg = yield task.receive(). Note that messages sent directly to the task from other task will also be received with the same method, so if it is necessary to distinguish if a message is received directly to or through a channel, then messages can be encapsulated with appropriate information.

rchan.unsubscribe(subscriber, timeout=None)

Note

This method must be used with yield as yield rchan.unsubscribe(task).

Removes subscriber from rchan's subscribers so it will no longer receive messages sent to rchan.

5.6. RPS (Remote Pico/Pycos Service)

As explained above, tasks running at a location can be used to send messages and monitoring. This, however, cannot be used to start tasks at remote peers. RPS, similar to Remote Procedure Call (RPC), allows peers to execute tasks with registered generator methods. With RPS, a generator method should be used first to create and register. Tasks running in remote peers can then obtain a reference to it with locate() method (similar to distributed tasks and channels). The reference can then be used to start a task with arguments.

class netpycos.RPS(method, name=None)

Creates an RPS instance with given method. method should be a generator function, similar to task methods; in fact, the given method is used to create task when remote task calls this method. If name is not given, generator method’s name is used for registering it. As in the case of tasks and channels, the name must be unique, at least at the pycos where it is registered.

rps.register()

Registers rps (a local instance of RPS) with the scheduler so it can be located (i.e., a reference to it can be obtained) by remote tasks.

Assuming an RPS is created with rps1 = RPS(method1), it can be registered with rps1.register() so remote tasks can locate it (see below) with the name “method1”.

rps.unregister()

Removes rps so RPS.locate from remote peers will fail.

An RPS registered can be located by remote peers (clients of RPS) to run tasks using the method registered.

locate(name, location=None, timeout=None)

Note

This method must be used with yield as rps = yield RPS.locate("rps_1").

This class method can be used to get a reference to RPS (at remote location) with the name name. location can be used to query a specific pycos peer and timeout is number (or fraction) of seconds to locate.

The reference returned from locate() can be used to create (remote) tasks (where RPS has been registered). If, as above, “method1” is registered at a remote peer at location loc1, a reference to it can be obtained with rps = yield RPS.locate("mehod1", loc1). If the name “method1” is unique among RPSs at all peer pycoss, the same can be obtained with rps = yield RPS.locate("mehod1").

If locate() is successful, the reference obtained can be used to start remote tasks (where RPS has been registered). With the above example, rps1 can be used as many times as necessary to create tasks at that remote peer with rtask = yield rps1(*args, **kwargs). The return value rtask is reference to remote task. This can be used for message passing, monitoring etc.

rps.monitor(task)

Adds task as a monitor for tasks created with rps so that rps will send exit status with MonitorStatus for every (remote) task created after this method returns. This method can be called with multiple tasks if necessary; all such tasks will receive status messages.

rps.close()

If a remote rps (instance obtained with locate) is no longer needed, it must be closed.

rps(\*args, \*\*kwargs)

Note

This method must be used with yield as rtask = yield rps(...).

Run (remote) task at remote peer with *args and **kwargs as expected by method of RPS. If the call is successful, reference to remote task (an instance of Task) is returned.

5.6.1. Examples

  • rps_log_watch_client.py and rps_log_watch_server.py use RPS to implement log monitoring where each node running log_monitor_server sends filtered log messages to client. This is useful to observe many nodes for potential issues (e.g., authentication failures).

  • rps_monitor_client.py and rps_monitor_server.py illustrate another approach to execute remote tasks: The server registers a function and client requests to execute task with that function. Compare this to dispycos_client.py where the client sends the computation itself to the remote server, so the client can execute arbitrary functions, whereas with RPS only registered functions can be executed by clients.

  • rps_node_client.py and rps_node_server.py use RPS feature so client can run a script (node_update_script.py in this example) on the server. Authentication is used to prevent unauthorized use of this feature.

  • rps_pico_service.py implements a tiny (pico) service using RPS. This service is used by rps_pico_client.py to get time at server.

  • rps_log_watch_server.py implements log file watch service that filters log messages matching a pattern (e.g., authentication failures in /var/log/auth.log). The server program can be running on each node to be watched. The client rps_log_watch_client.py uses RPS to run this service to get such log messages from all the servers.

  • rps_monitor_server.py uses RPS to implement a function that can be invoked by client rps_monitor_client.py remotely. Client also uses monitor functionality to catch errors. Note that this behavior is different generic client/server as the server implements specific functionality that clients can use.

  • rps_node_server.py and rps_node_client.py implement a server which registers an RPS to run scripts sent by clients and client which uses that RPS to run tasks.

5.7. SSL (Security / Encryption)

If peers are on public / remote networks, SSL (Secure Sockets Layer) can be used to encrypt all communication, including data, so that the connection is private - only the sender and recipient see the data exchanged and other observers see encrypted data. netpycos provides a simple mechanism where digital certificates / symmetric keys are used to encrypt data on one side and decrypt data on the other side. Below are set of commands to generate the certificates using openssl tool (this is just an example - there are other approaches / methods):

openssl req -x509 -newkey rsa:2048 -keyout sskeycert.pem -nodes -out sskeycert.pem -sha256 -days 1000

This command generates self-signed key and certificate pair in file sskeycert.pem that should be used as certfile parameter where SSL is used. It is also possible to generate key and certificate in separate files, and use keyfile and certfile parameters appropriately.

Once the key/certificate pair is generated (or obtained by other means), they should be copied to each of the peers (over a secure channel, such as ssh) and the peers created with pycos.Pycos(..., certfile=sskeycert.pem) (or if key and certificate are stored in separate files, with pycos.Pycos(..., certfile=sscert, keyfile=sskey).