6. Distributed Communicating Tasks (dispycos)

dispycos module provides API for sending computation fragments (code and data) to remote server processes for executing distributed communicating tasks. Whereas RTI (Remote Task Invocation) provides API for creating remote tasks with pre-defined generator functions, dispycos’s API provides generic framework that can be used by clients to send different computaions to create tasks at remote servers. There are three components in dispycos:

  • Node / Servers (dispycosnode) program should be running on each of the nodes that run the servers to execute tasks for clients,
  • Scheduler (dispycos) that schedules client computations, manages nodes, remote tasks etc.
  • Computation API for clients to create computations, schedule it with the scheduler and to run remote tasks.

6.1. Examples

There are many illustrative use cases in ‘dispycos_*.py’ files in the ‘examples’ directory under where pycos module is installed. To use these examples, run ‘dispycosnode.py’ program on one or more of the nodes (most examples use one or two servers) along with an example file. The examples are written to illustrate various features, and not necessarily in a simple way, or error checking is not comprehensive. The comments in the programs explain usage / notes.

Compared to dispy project, there are a few additional steps involved in distributing and getting results with pycos/dispycos; however, pycos/dispycos offer many features, such as communicating with computation (even computations communicating among themselves), data streaming, live analytics etc.

Following is a brief description of the examples included relevant to this section:

  • dispycos_client1.py illustrates how to use dispycos to distribute computations to remote servers to run them as tasks on those servers and get results back to client.
  • dispycos_client2.py is a variation of dispycos_client1.py. In this example, http server is used to monitor cluster, nodes, remote tasks.
  • dispycos_client3.py shows how to exchange messages with objects (instances of class) between client and remote tasks.
  • dispycos_client4.py sends files at the client to remote task to execute computations that task those files and the remote task in turn sends the results in files back to the client.
  • dispycos_client5.py runs an external program (dispycos_client5_proc.py) at remote servers. The program reads from standard input and writes to standard output. Asynchronous pipes and message passing are used to send input from client to this program executing on remote servers, and get the output back to client.
  • dispycos_client6.py uses streaming of data to remote tasks for efficient communication. The example also shows how to implement live/real-time analytics and send them to client.
  • dispycos_client6_channel.py is same as dispycos_client6.py, except it uses channel to broadcast data to remote tasks.
  • dispycos_client7.py is an alternate implementation of dispycos_client1.py; it uses messages from dispycos scheduler to schedule remote tasks and get results.
  • dispycos_client8.py demonstrates that long-runnning computations without yield often can be executed. In this case time.sleep is used to simulate computation. Note that time.sleep blocks entire pycos framework so no other tasks can execute until next yield. With version 4.1 (and above) I/O processing, message passing, sending heartbeat messages to scheduler etc. are handled by a separate (called “reactive”) pycos scheduler that is not affected by user’s tasks. So messages sent by client are received and queued by reactive scheduler.
  • dispycos_client9_node.py uses status messages from dispycos scheduler to distribute data files to nodes and run node specific setup task to load the data in memory. This data is then processed in computations to illustrate in-memory processing. This example doesn’t work with Windows (due to lack of ‘fork’ in Windows), so nodes running Windows are filtered out using DispycosNodeAllocate.
  • dispycos_client9_server.py is similar to dispycos_client9_node.py above, except that instead of initializing (memory in) nodes, each server in each node is initialized by distributing one file per server (note that one node may run as many servers as there are processors on that node), which is then read in memory on that server for in-memory processing at server level.
  • dispycos_httpd1.py shows how to use HTTP Server module to provide HTTP interface to monitor dispycos cluster.
  • dispycos_httpd2.py is a variant of dispycos_httpd1.py to use status_task to process messages from dispycos scheduler (in this case just to print when a remote task is finished) while also using HTTP Server (which chains messages from dispycos scheduler to client’s status_proc).
  • dispycos_ssh_ec2.py is a variation of dispycos_client8.py that uses Amazon EC2 for cloud computing; see Cloud Computing for more details.

6.2. Node / Servers

dispycosnode program (dispycosnode.py) is used to start server processes at a node. These server processes are used by dispycos scheduler to run computations submitted by clients. The program, by default, starts one server for each processor available so that CPU intensive computations can utilize all the processors efficiently. Each server runs as a separate process so computations running in one server process don’t interfere with computations in another server process on the same node. However, multiple tasks can be scheduled on one server so that if computations have I/O (such as communicating with other computations / client, or reading/writing data), pycos can run another task that is ready to execute on that server process. All tasks running in one server process share the address space, run on one CPU; however, as pycos doesn’t pre-empt a running task until yield is used, there is no need to use locking critical sections, as done with threads. If all computations are same and do not need to communicate with each other, then dispy project can be used.

The program takes following options to customize how the servers are started.

  • -c option specifies number of instances of dispycos server processes to run. Each server process uses one processor. If -c option is used with a positive number, then that many processors can be used in parallel; if it is negative number, then that many processors are not used (from available processors) and if it is 0 (default value), then all available processors are used.

  • -i or --ip_addr is same as node option to Pycos; this option can be repeated for multihome networking

  • --ext_ip_addr is same as ext_ip_addr option to Pycos; this option can be repeated for multihome networking

  • -u or --udp_port is same as udp_port option to Pycos

  • -n or --name is same as name option to Pycos

  • --dest_path is same as dest_path option to Pycos

  • --max_file_size is same as max_file_size option to Pycos

  • -s or --secret is same as secret option to Pycos

  • --certfile is same as certfile option to Pycos

  • --keyfile is same as keyfile option to Pycos

  • --min_pulse_interval n specifies minimum pulse interval that can be given as pulse_interval by client computations. The default value is MinPulseInterval defined (as 10 seconds) in dispycos module. As nodes send availability status (CPU, memory and disk avaialability), the clients may want this inofrmation more frequently than at MinPulseInterval, in which case, smaller value can be specieid with this option.

  • --max_pulse_interval n specifies maximum pulse interval that can be given as pulse_interval by client computations. See min_pulse_interval above.

  • --zombie_period n is maximum number of seconds that dispycosnode remains idle (i.e., doesn’t run any computations for current client) before the node closes current computation so another computation may use node.

  • -d or --debug option enables debug log messages.

  • --tcp_ports n1-n2 or --tcp_ports n option can be used to specify list of TCP ports to be used by servers. Without this option a server uses any available (dynamic) port, which can be a problem with remote servers or cloud computing that require firewall to be configured to forward ports. With tcp_ports, specific range of ports can be used and those ports can be configured for port forwarding. The range can be given either as n1-n2, in which case ports from n1 to n2 (both inclusive) will be used, or as single number n. tcp_ports can be used as many times as necessary to add different ranges, or different ports. If number of ports listed is less than number of servers (based on -c option, or number of availalble CPUs), dispycosnode will use ports beyond the highest listed port; thus, if number of servers to start is 8, and –tcp_ports 2345-2347 –tcp_ports 3799 is given, then servers will use ports 2345, 2346, 2347, 3799, 3780, 3781, 3782, 3783, or if, say, just –tcp_ports 51347 is used, then ports from 51347 to 51354 are used.

  • --serve n option indicates number of computations/clients to serve before exiting. The default value of -1 implies no limit and any positive number causes dispycosnode to quit after running that many computations. This option can be used with Docker Container to run each computation in a new container, so one computation starts with the same environment the docker image was built with.

  • --daemon, if given, indicates that dispycosnode shouldn’t read standard input. Without this option and started with command-line, dispycosnode offers a choice of commands that can be input to get status or quit. Currently supported commands are:

    • “status” shows status of each of the processes, such as number of tasks being executed for a computation
    • “close” closes currently executing computation (if any). This is equivalent to computation calling “close” method. If any tasks are being executed for that computation, they will be killed. A new computation can then use the server.
    • “quit” or “exit” command causes dispycosnode to stop accepting any more tasks and when all tasks are done, closes the computation and quits.
    • “terminate” kills currently executing tasks, closes computation and quits.
  • --service_start HH:MM, --service_stop HH:MM and --service_end HH:MM set time of day when dispycos service can be used by clients. The time is given as hours of day in 24-hour format, a colon and minutes of hour. If start time is not given, currrent time is assumed as start time. Either stop time or end time must also be given. After stop time new jobs will not be accepted, but currently running jobs will continue to execute. At end time any running jobs will be killed. For example, if service_start is set to 17:00, service_stop set to 07:00 and service_end is set to 08:00, the node will execute jobs from 5PM, stop accepting new jobs at 7AM (next day), and kill any running jobs at 8AM. Then it will not accept any jobs until 5PM.

  • --clean indicates that dispycosnode should kill any previous server processes left running. When dispycosnode server processes are started, they store process ID in files on file system. When the server processes quit, they remove the files. If the processes are terminated, for example, by killing them explicitly or due to an unforeseen exception, then the files are left behind and the next time the server processes notice the files from previous run and refuse to start. clean option can be used to request the server process to kill previous servers. This should be done only if previous process is not running, or acceptable to kill the process if it is running.

  • --peer location requests dispycosnode to contact given peer to estabilish communication with it. location should be given in the form node:port where node is either host name or IP address and port is TCP port where peer is running. This option can be used multiple times to detect multiple peers.

    A typical use case where client’s computations don’t communicate with computations on other dispycosnodes (i.e., each task executing on a dispycosnode communicates with tasks at client or other local tasks executing on that dispycosnode only) can be implemented with running each dispycosnode without discover_peers and peer options, and then start scheduler / client. (The scheduler and client use discover_peers so they detect all available dispycosnode servers, even the ones started without discover_peers option.)

    If scheduler is already running, peer option can be used with location where scheduler is running so that scheduler and new dispycosnode server can detect each other; alternately, discover_peers can be used, but in that case other dispycosnode servers will also detect new server.

  • --save_config <file> saves configuration (various options to start dispynode) in given file and exits. See --config below.

  • --config <file> loads configuration (various options to start dispynode) from given file (saved with --save_config option).

    --save_config can be used to save most of the options (certain options, such as ip_addr are unique to a node) and --config can be used to run dispycosnode with those options on many nodes.

6.3. Scheduler

dispycos scheduler schedules computations, keeps track of available nodes, server processes etc. Client computations, constructed with Computation, are sent to the scheduler. The scheduler queues the client computations and processes one each time, until the computation closes. Even if currently scheduled computation is not utilizing all the servers, the scheduler doesn’t schedule any other computations, so that one computation’s state doesn’t interfere with another computation’s. That is, currently scheduled computation reserves all the available servers.

Scheduler can be started either as a separate program, or from within the client program. If multiple client programs can schedule computations simultaneously, scheduler should be started as a program on a node; then the scheduler will schedule them in the order submitted.

6.3.1. Private/Local Scheduler

If only one client uses the nodes, then it is easier to start the scheduler in dispycos module with:

Scheduler()

in the client program before computation is scheduled with compute.schedule() (see below). There is no need to hold reference to the object created, as the methods of scheduler are not to be used directly by the client; instead, computation created with Computation should be used to run remote tasks. If necessary, options used in Pycos can be passed, such as node to specify host name or IP address to use for distributed computing. When the scheduler starts, it detects dispycos nodes and servers on the configured network. See ‘dispycos_client*.py’ files in examples directory for use cases.

6.3.2. Remote/Batch Scheduler

If more than one client may use the scheduler simultaneously, scheduler should be started by running the dispycos program (dispycos.py). The program takes following options (same as dispycosnode, except for starting servers):

  • -i or --ip_addr is same as node option to Pycos; this option can be repeated for multihome networking
  • --ext_ip_addr is same as ext_ip_addr option to Pycos; this option can be repeated for multihome networking
  • -u or --udp_port is same as udp_port option to Pycos
  • -t or --tcp_port is same as tcp_port option to Pycos
  • -n or --name is same as name option to Pycos
  • --node <host name or IP> can be used as many times as necessary to list name or IP address of nodes in remote networks. Nodes in local network are automatically found, so no need to list them with this option. Moreover, listing only one node per one remote network should be enough - pycos finds all nodes in a netowrk by broadcasting.
  • --dest_path is same as dest_path option to Pycos
  • --max_file_size is same as max_file_size option to Pycos
  • -s or --secret is same as secret option to Pycos
  • --certfile is same as certfile option to Pycos
  • --keyfile is same as keyfile option to Pycos
  • --zombie_period=sec specifies maximum number of seconds a remote server process can stay idle before it closes computation. The default value is 10*MaxPulseInterval, which is 1000 seconds. Once all servers used by a computation close, the computation is discarded so other pending (queued) computations can be run. If zombie_period is set to 0, then idle check is not done, so computations are not automatically closed.
  • -d or --debug option enables debug log messages.
  • --daemon, if given, indicates that dispycos scheduler shouldn’t read standard input. Starting the scheduler as background process (i.e., with & in Unix, for example) implies daemon. If not a daemon, the scheduler can be terminated with “quit” or “exit” commands.

When remote scheduler is running on a computer in local network, Computation.schedule() will locate the scheduler automatically. If the scheduler is in remote network, scheduler.peer() method of Pycos should be used so pycos can locate the scheduler.

Note that dispycos scheduler runs jobs for at most one computation at any time. Other computations are queued and wait for their turn in the order submitted; when currently running computation finishes, next computation in queue is made active so its jobs can be run.

Examples dispycos_* use local scheduler with Scheduler() assuming that no other clients may be using nodes. If multiple clients share nodes, that statement should be removed (or commented) and dispycos.py program must be running on a machine in local network. It is also possible to use scheduler running in a remote network in which case peer method of Pycos should be used to establish communication first.

6.4. Computation

dispycos module’s Computation provides API for client programs to package computation fragments, send it to scheduler, and submit tasks to be executed at remote server processes to execute distributed communicating processes. A computation’s jobs (remote tasks) at a remote server run with only the components pacakge. A remote task can expect to have only pycos module available; any other modules, global variables etc. need to be initialized appropriately. If necessary, initialization can be done by scheduling a job on remote servers, e.g., to read data in a file, before running other jobs, which can expect the side effects of setup job. See dispycos_*.py files in examples directory (under where pycos module is installed) for different use cases. All tasks at a server process (if more than one job is scheduled concurrently) run in one thread / processor, share the address space, and run concurrently.

Computation(self, components, pulse_interval=(5*MinPulseInterval), node_allocations=[], status_task=None, node_setup=None, server_setup=None, disable_nodes=False, disable_servers=False, peers_communicate=False):
  • components must be a list, each element of which can be either a Python function class module, or path to a file. These computation fragments are sent to dispycos servers for execution. Once the computation is scheduled (i.e,. schedule method is finished), generator functions can be run on the nodes / servers. These jobs can use the components packaged.

    If a component is a file, it is stored in (remote) node’s directory $TMPDIR/pycos/dispycos, where $TMPDIR is as returned by Python’s tempfile.gettmpdir(), whereas jobs schedued later on servers are executed in server’s directory dispycosproc-$n under node’s directory, where $n is a number from 1 to number of servers executing on that node. Thus, a computation can access a file transferred with components (and by node_available) at parent directory of current working directory - see dispycos_client9_node.py for an example where a file transferred to nodes. Python’s sys.path is set to include both server process’s directory and node directory so loading modules works without additional steps.

    If a component is a Python code fragment (function, class), then this would be sent once to the node and all the servers would’ve been initialized with these definitions before any jobs are executed. It is not necessary for all functions used later to submit jobs later to be listed in compooents; however, the scheduler will then send definitions (code) for such functions, which is less efficient. So whenever possible, listing all definitions in components is advised.

  • pulse_interval is interval number of seconds at which pulse messages are exchanged among servers and client to check for faults. If no messages are received within (5 * pulse_interval), the remote end is assumed to be faulty and it is closed. This value must be at least MinPulseInterval and at most MaxPulseInterval. Both of these values are defined in dispycos module. The default value of pulse_interval is 2*MinPulseInterval.

    If nodes have psutil module installed, they send node availability status (CPU available in percent, memory in bytes and disk space in bytes) as an instance of DispycosNodeAvailInfo at pulse_interval frequency. This information is useful for monitoring application performance, filtering nodes for required resources etc. This information is shown in web browser if HTTP Server is used.

  • node_allocations should be a list of DispycosNodeAllocate instances. When a node is discovered, the scheduler executes allocate method of each allocation with ip_addr, name, platform, cpus, memory, disk arguments. If the method returns a positive number, that many cpus are used on that node. If return value is 0, then that node is not used and if the return value is negative number, then next allocation in node_allocations is applied. See, for example, dispycos_client9_node.py that filters out nodes running Windows as this example doesn’t work on such nodes.

    If necessary, DispycosNodeAllocate can be sub-classed to override allocate method and objects of that sub-class can be passed to node_allocations. However, this works only with private scheduler (i.e., scheduler creted in the client program), but with shared scheduler.

  • status_task, if given, must be a task. If it is a task, dispycos scheduler sends status of remote servers, jobs executed etc., to this task as messages. These messages can be used to schedule jobs.

  • node_setup, if given, must be a generator function. This function is executed at remote dispycos node to prepare the node before server processes are created. If the task finishes with value 0, the setup is assumed to be successful and node process creates server process for that computation. Otherwise, node is not used for that computation.

    If node_setup needs to be executed with node-specific arguments, then disable_nodes=True and status_task can be used to call enable_node when NodeDiscovered message is received. node_speficic is executed in node’s directory $TMPDIR/pycos/dispycos (where $TMPDIR is as returned by Python’s tempfile.gettmpdir(). See dispycos_client9_node.py for an example.

    node_setup runs in a process that doesn’t have networking enabled, so this function can’t communicate with client / other peers. node_setup is not executed on nodes running Windows.

  • server_setup, if given, must be a generator function. This function is executed by each server process on each node (note that a node may run as many server processes as there are processors available on that node). Similar to node_setup, this function is meant to initialize server; disable_servers=True and server_task can be used to call enable_server to pass arguments specific to each server if necessary. See dispycos_client9_server.py for an example.

  • disable_nodes is by default False, in which case the scheduler will initialize a node when it becomes available (the node will execute node_setup if it is set, but without any parameters; i.e., node_setup should not have any formal parameters, except for task=None keyword argument). If disable_nodes is True, the scheduler will not initialize the node. If Computation has set status_task, then the scheduler will send NodeDiscovered message to it. The status_task can then call enable_node method as appropriate (with any additional parameters required to call node_setup). See dispycos_client9_node.py where disable_nodes is used and status_task is used to call enable_node with parameter required to call node_setup.

  • disable_servers is by default False, in which case each (remote) server process will initialize itself as soon it is started by node. If disable_servers is True, the server will wait until Computation enables it with enable_server (with any additional parameters required to call server_setup). See dispycos_client9_server.py where disable_servers is used and status_task is used to call enable_server with parameter required to call server_setup.

  • peers_communicate requests dispycos scheduler to inform each remote server (peer) about other servers so computations executing on these servers can communicate. Without this option (default), a computation executing on a remote server can communicate with the client only - servers are not aware of each other.

    Using this option causes each server to establish communication with each other server; this can be expensive, especially if there are many servers in the cluster.

    The computation created as, for example, compute = Computation(...), has following methods:

    compute.schedule(location=None)

    Note

    This method must be used with yield as result = yield compute.schedule().

    Schedule computation for execution. If scheduler is remote and executing other computations, this method will block until those computations close. If successful, result will be 0.

    location, if not None, should be a Location instance refering to where the scheduler is running. If it is None, scheduler must be running at a node in local network; pycos will use name resolution to find the scheduler (a task) with the registered name “dispycos_scheduler”.

    Note

    All the run* methods below take generator function and arguments used to create tasks (jobs) at a remote server. If the generator function used in these methods is given as one of the components used to create computation, the code for the function is transferred to the servers once during initialization (thus a bit efficient); otherwise, the code is transferred to the servers each time a run* method is called.

    compute.run_at(where, gen, *args, **kwargs)

    Note

    This method must be used with yield as rtask = yield compute.run_at(...).

    Run given generator function gen with arguments args and kwargs at where; i.e., create a task at a server with the given generator function and arguments. If the request is successful, rtask will be a (remote) task; check result with isinstance(rtask, pycos.Task). The generator is expected to be (mostly) CPU bound and until this is finished, another CPU bound task will not be submitted at the same server.

    If where is a string, it is assumed to be IP address of a node, in which case the task is scheduled at a server at that node. If where is a Location instance, it is assumed to be server location in which case the task is scheduled at that server.

    gen must be generator function, as it is used to run task at remote location.

    args and kwargs must be serializable.

    compute.run(gen, *args, **kwargs)

    Note

    This method must be used with yield as rtask = yield compute.run(...).

    Similar to run_at, except that the task is executed at a server on a node with least load.

    run_result_at(where, gen, *args, **kwargs):

    Note

    This method must be used with yield as result = yield compute.run_result_at(where, gen, ...).

    Similar to run_at method, except that instead of returning reference to remote task, the call blocks until remote task is finished and its value (either the last value ‘yield’ed in the task or value of rause StopIteration) is returned.

    run_result(gen, *args, **kwargs):

    Note

    This method must be used with yield as result = yield compute.run_result(gen, ...).

    Similar to run_result_at method, except the task may run on any avavilable server.

    run_results(gen, iter):

    Note

    This method must be used with yield as results = yield compute.run_results(gen, ...).

    Runs run_result method for each item in given iterable iter. The return value is list of results that correspond to executing gen with items in iterable in the same order. If gen takes multiple arguments, each item can be given as tuple (i.e., iter would be list of tuples).

    run_async_at(where, gen, *args, **kwargs):

    Note

    This method must be used with yield as rtask = yield compute.run_async_at(where, gen, ...).

    Similar to run_at method, except that the task will run on some server, even if it is currently running other tasks. gen is supposed to be not CPU bound (i.e., the task should be mostly waiting for asynchronous operations - I/O events or messages), so running this will not impede other tasks. dispycos scheduler will not “track” these asynchronous tasks; when computation is closed, scheduler wait for other remote tasks scheduled with run_at, run_results_at etc. before closing a server, but not for async remote tasks.

    run_async(gen, *args, **kwargs):

    Note

    This method must be used with yield as rtask = yield compute.run_async(gen, ...).

    Similar to run_result_at method, except the task may run on any server.

    compute.nodes()

    Note

    This method must be used with yield as nodes = yield compute.nodes().

    Returns list addresses of nodes used / available for computation.

    compute.servers()

    Note

    This method must be used with yield as servers = yield compute.servers().

    Returns list locations of servers used / available for computation.

    close()

    Note

    This method must be used with yield as yield compute.close().

    Requests scheduler to close computation. If any remote tasks are still pending (except ones created with run or run_at), this method will block until those tasks are finished. Closing computation causes each server process to remove any files saved or created at that server, remove global variables created by the jobs etc. If the scheduler is shared (i.e., dispycosnode.py is run as external program), next computation waiting to be scheduled will be allowed to use the cluster to run remote tasks.

The remote task rtask obtained with run methods above can be used for message passing or terminated; see Distributed Tasks. Although it can also be monitored with yield rtask.monitor(task), dispycos scheduler monitors all tasks created with run methods and sends MonitorException message to status_task about remote tasks.

6.5. DispycosStatus

When dispycos scheduler changes state of nodes, servers or tasks, it sends the changes as messages to computations status_task task, if it is initialized to a task before the computation is scheduled. This is so status_task can schedule tasks as and when servers are avialable, for example. Each message is an instance of DispycosStatus, with attributes status and info:

  • status is either NodeDiscovered, NodeInitialized, NodeClosed, ServerDiscovered, ServerInitialized, ServerClosed, TaskCreated, or ComputationClosed.

  • info depends on status: If status is TaskCreated (indicating a task has been created at remote server), then info is an instance of TaskInfo; if status is for a node (i.e., one of NodeDiscovered, NodeInitialized, NodeClosed), then info is an insance of DispycosNodeAvailInfo for the node; if status is for a server (i.e., one of ServerDiscovered, ServerInitialized, ServerClosed), then info is an instance of Location; if status is TaskCreated, then it is an instance of TaskInfo with following attributes:

    • task is (remote) task (instance of Task),
    • *args is arguments used to create task,
    • **kwargs is keyword arguments used to create task,
    • start_time is time when task was created.

6.6. DispycosNodeAvailInfo

A node’s availability information is sent to Computation‘s status_task at pulse_interval frequency with an instance of DispycosNodeAvailInfo, which has 5 read-only attributes:

  • location is instance of Location where node’s task is running. This location is of interest only in node_avaiable (see Computation above) to send files to node. Elsewhere, location’s addr part (which is IP address of node) can be used for allocating CPUs / filtering nodes (with DispycosNodeAllocate), maintain status information etc.
  • cpu is available CPU as percent. If it is close to 100, the node is not busy at all, and if it is close to 0, the node is rather busy (running compute-intensive tasks on all CPUs). This field is set only if psutil module is available on the node; otherwise it is set to None.
  • memory is available memory in bytes. This is not total memory, but usable memory, as interpretted by psutil module. This field is set only if psutil module is available on the node; otherwise it is set to None.
  • disk is available disk space in bytes for the partition that dispycosnode uses as given by dest_path option (where client’s files are saved by dispycosnode and jobs are run). This field is set only if psutil module is available on the node; otherwise it is set to None.
  • swap is available swap space in bytes on the node. This field is set only if psutil module is available on the node; otherwise it is set to None.

6.7. DispycosNodeAllocate

If a cluster has nodes with different resources and computation requires specific resources, then the computation can use node_allocations list with Computation to control allocation of nodes / servers. Each element in the list must be an instance of DispycosNodeAllocate with following attributes:

  • node must be either node’s name or an IP address or a regular expression of IP address to match. If it is name, it is resolved to IP address.

    The default allocate method will choose an allocation only if a node’s IP address matches this field.

  • platform must be a Python regular expression. For a computation to use a node, given expression must occur in its platform string, obtained by platform.platform() on the node, ignoring case. Default value is '', which matches any platform. For example, linux.*x86_64 accepts only nodes that run 64-bit Linux.

  • cpus must be an integer. If it is a positive number, then a node must have at least that many servers enabled to be used for the computation. If it is 0 (default), a node with any number of servers is accepted (i.e., no constraints on number of servers).

  • memory must be an integer. If it is a positive number, then a node must have at least that many bytes of memory to be used for the computation. If it is 0 (default), a node with any number of bytes is accepted (i.e., no constraints on amount of memory).

  • disk must be an integer. If it is a positive number, then a node must have at least that many bytes of disk space on the partition used by dispycos to be used for the computation. If it is 0 (default), there are no constraints on amount of available disk space on dispycosnode’s partition. This field may be None if node doesn’t have psutil module.

This class provides allocate method that is called by scheduler when a node is available. The method is called with arguments ip_addr, name, platform, cpus, memory, disk, where ip_addr is IP address of node, name node’s name (either given with --name option to dispycosnode or node’s host name as obtained by socket.gethostname()), platform is as obtained by platform.platform() on the node, cpus is number of CPUs available, disk is available disk space (see above) if psutil module is available or None otherwise.

This method should return number of CPUs to allocate. If the return value is

  • positive number, that many CPUs are allocated (this number should be less than or equal to cpus called with),
  • 0, the node is not used for this computation
  • negative number, then this allocation is ignored and next item in node_allocations list is applied.

6.8. HTTP Server

pycos.httpd module provides provides HTTP interface to monitor and manage dispycos servers (nodes and servers) with a web browser; it works with common web browsers, including in iOS and Android devices. It doesn’t require / use apache or other web servers. HTTP server can be created with:

class HTTPServer(computation, host='', port=8181, poll_sec=10, DocumentRoot=None, keyfile=None, certfile=None, show_task_args=True)

Creates an instance of HTTP server which will listen for connections at given host and port.

  • computation is an instance of Computation whose status will be sent to HTTP client (browser).
  • host should be a string, either IP address or name of the host. The default value of ‘’ will bind the server to all configured network interfaces.
  • port should be a number HTTP server binds to. Web browsers can monitor and manage cluster by connecting to http://<host>:<port> if SSL (https) is not used and https://<host>:<port> if SSL is used.
  • poll_sec is number of seconds (interval) the client waits between update requests from the server. Smaller values of poll_sec will cause more frequent updates so the information shown is more accurate, but cause more network traffic/load. Bigger values of poll_sec are more efficient but the status in browser may not reflect more recent information.

This value can be changed in the browser as well.

  • DocumentRoot is directory where monitor.html, pycos.css, etc. files needed for the service are stored. If this is not set, the directory is assumed to be data directory under the directory where pycos.httpd module is stored.

  • keyfile and certfile, if given, will be used to configure SSL so https can be used between browser and HTTP server. If both key and certificate are in the same file, then it should be given as certfile.

  • show_task_args boolean parameter controls whether task arguments are shown in web browser. Default value True sends task arguments to the browser. If tasks are created with large data as arguments, though, it may be quite inefficient to exchange that data between the scheduler and the browser. In such case, it is strongly recommended to set this parameter to False so viewing tasks on a server doesn’t cause performance issues. Note that httpd converts task arguments to strings (if arguments are not primitive types, the classes must provide serialization methods) before sending them to the browser. If serialization is not possible, httpd may fail. Setting show_task_args to False will prevent this.

    This parameter can also be updated dynamically in ‘Cluster’ page with web browser.

The HTTP server has following methods:

shutdown(wait=True)

Shuts down HTTP server. If wait is True, the server waits for current poll_sec period before shutting down so the web browser gets all the updates.

Note

When cluster status is being monitored, the HTTP server sends only changes to cluster status between updates to browser (for efficiency); i.e., each time browser gets the status update at current poll_sec interval, the server sends the changes since last time browser requested data. The browser computes full current status with these updates. Consequently, the status can be viewed in only one browser; if more than one browser is used, they will not get full information.

status_task

This is a tasktune that should get all status messages sent by scheduler. The client program should set status_task attribute to this task if the client needs to process status messages itself, or if messages need to be chained to other recipients.

6.9. Example

See dispycos_httpd1.py for an example.

6.10. Client (Browser) Interface

Once HTTP server is created, the dispycos servers can be monitored and managed in a web browser at http://<host>:8181, where <host> is name or IP address of computer running the program. If SSL certificates are used to setup HTTP server, https protocol should be used in URL above. There are currently 3 sections (menu items):

6.10.1. Cluster Status

The Cluster menu shows summary of nodes and tasks:

_images/cluster.png

The information shows summary of nodes and tasks. Task summary shows total number of tasks submitted so far, done (finished or cancelled) and currently running tasks. The nodes summary shows IP address, number of servers running on that node, number of tasks submitted to all servers on that node, number of tasktuines done by all servers on that node, and number of currently running tasks by all servers on that node. Each node’s IP address is shown with hyper link; when the link is activated, the page changes to show status for that node, as explained in Node Status.

The nodes are sorted by default on the IP address in descending order. The field to sort on can be changed; however, as other fields are not necessarily unique, sorting on other fields is inefficient, so if there are many nodes, especially with frequent updates, choose IP address as the field to sort. Sorting can be changed even after cluster is closed.

‘Show Task Arguments’ checkbox controls whether task arguments are shown in ‘Server’ page. If tasks are created with large data as arguents, it is recommended to disable this, as otherwise exchanging that data between scheduler and web browser can be quite inefficient.

6.10.2. Node Status

Each node in Cluster Status section is a hyper link which when followed (to Node menu) shows details about that node, including servers available, tasks processed:

_images/node.png

The summary of node includes number of server processes running on the node, number of tasks running on each server. Each server is shown with its location (an instance of Location) as hyper link. This link can be used to get details of tasks running on that server, as explained in Server Status.

6.10.3. Server Status

As noted above, a dispycosnode program starts a dispycos server process for each CPU available (or as specified with -c option) on that node. Each server process runs tasks submitted by dispycos scheduler. The Server menu shows number of tasks submitted, number of tasks done, and details of each task currently running. The details show the name of task (function), the arguments used to run it and the time when it was started.

_images/server.png

The arguments are converted to strings by HTTP server before sending to the browser client. If any of these are instances of user provided classes, it may be useful to provide __str__ method. Otherwise, Python’s default __str__ method may show it as simply an instance of a class, which may not be very useful.

If necessary, tasks can be selected and terminated (killed).

6.11. Docker Container

dispycosnode islotates computation environment so that jobs from one computation don’t interfere with jobs from another computation, even if a node is shared and jobs from different computations are running simlutaneously. Usually, any files transferred and saved by jobs are also removed when computation is closed (the exception is when dest_path is given or if cleanup is False, when files may be left behind). However, the jobs have access to server’s file system so they can be security risk. It is possible to avoid (some) issues by creating special user with access only to specific path (e.g., with a chroot environment).

If complete isolation of computation is needed, Docker containers can be used. Each container runs a copy of small Linux distribution with its own file system; the container has no access to host file system (unless it is configured to). pycos now includes Dockerfile under data directory where pycos module is installed, which can be obtained with the program:

import os, pycos
print(os.path.join(os.path.dirname(pycos.__file__), 'data', 'Dockerfile'))

Note that Docker runs under Linux host only; with other operating systems, a guest VM can be used to run Linux under which Docker can be run. See Docker Machine and Docker Docs for more details.

To build an image with latest Ubuntu Linux and pycos, install docker if not already installed, create a temporary directory, say, /tmp/pycos-docker, change to that directory and copy Dockerfile from above to that directory. (The Dockerfile can be customized to suit any additional tools or setup needed.) Then execute docker build -t pycos . (note the dot at the end). Full list of instructions for building image for Python 2.7 (for Python 3 use appropriate path to where Dockerfile is installed) are:

mkdir /tmp/pycos-docker
cd /tmp/pycos-docker
cp /usr/local/lib/python2.7/dist-packages/pycos/data/Dockerfile .
docker build -t pycos .

Once the image is built, a new container can be run with:

docker run --net=host -it pycos

to start dispycosnode.py (which is the default command for the image built above) with default options. --net=host runs container in host network mode, i.e., container uses host network configuration. See –save_config and –config options to dispycosnode to use same options across many runs. If these or any other options are needed, Dockerfile can be customized before building the image in the instructions above.

If each computation should be started in a new container (so that computations start in the same environment using the image built above), then serve option can be used as:

while :; do
    docker run --net=host -it pycos dispycosnode.py --serve 1
done

This causes dispycosnode to quit when the client closes currently running computation, which terminates container and because of while loop, a new container is started from the image.

dispy project also has similar instructions for building docker images. Since dispy depends on pycos, pycos modules, including dispycosnode, are also installed when installing dispy. So it is possible to build dispy and use dispycosnode (e.g., with docker run --net=host -it dispy dispycosnode.py) from dispy image instead of dispynode (when dispycosnode is more appropriate than dispynode).

6.12. Cloud Computing

ext_ip_addr of Node / Servers can be used to work with cloud computing service, such as Amazon EC2. Other cloud computing services can also be used similarly.

It may be necessary to setup the configuration to allow TCP ports used by dispycosnode. Here we assume ports 51347 and above are used by dispycosnode. For example, with EC2 “Security Group” should be created and assigned to the instance so inbound TCP ports 51347 (and/or other ports used) are allowed.

With EC2 service, a node has a private IP address (called ‘Private DNS Address’) that uses private network of the form 10.x.x.x and public address (called ‘Public DNS Address’) that is of the form ec2-x-x-x-x.x.amazonaws.com. After launching instance(s), login to server(s), install pycos (e.g., with pip install pycos) and run dispycosnode on each node with:

dispycosnode.py --ext_ip_addr ec2-x-x-x-x.y.amazonaws.com --tcp_ports 51347

(this address can’t be used with -i/–ip_addr option, as the network interface is configured with private IP address only). This node can then be used by dispycos client from outside EC2 network by specifying ec2-x-x-x-x.x.amazonaws.com as a peer (see below). With ext_ip_addr, dispycosnode acts similar to NAT - it announces ext_ip_addr to other services instead of the configured ip_addr so that external services send requests to ext_ip_addr.

If the EC2 node can connect back to client with the IP address and port used by client, the node can be paired with:

...
yield pycos.Pycos().peer(pycos.Location('ec2-x-x-x-x.y.amazonaws.com', 51347))
if (yield computation.schedule()):
    raise Exception('Schedule failed')
...

By default, pycos uses random TCP port. Within a local network or if client can be reached at any port, this works fine. If the client is behind a router, the router’s firewall can be configured to forward a specific port, say, 4567 (or, 51347 at client as well; here, to avoid confusion a different port is used), to client’s IP address, and pycos can be configured in the client to use tcp port 4567 with:

pycos.Pycos(tcp_port=4567)

before any tasks or channels are created (creating a task or channel automatically starts pycos with default parameters, which uses random TCP port).

If client is behind a router and its firewall can’t be setup to forward port 4567, then ssh can be used to forward the port. To use this, first login to EC2 node with:

ssh -i ec2-key.pem 4567:127.0.0.1:4567 userid@ec2-x-x-x-x.y.amazonaws.com

Then start dispycosnode as mentioned above, and start pycos at client with:

pycos.Pycos(node='127.0.0.1', tcp_port=4567)

See dispycos_ssh_ec2.py for an example where ssh port forwarding is used for cloud computing with Amazon EC2.

In case of problems, enable debugging on the nodes (with -d option) and client (with pycos.logger.setLevel(logging.DEBUG) statement, as done in example above). If that still doesn’t work, check that the node is reachable with telnet ec2-x-x-x-x.y.amazonaws.com 51347 from client (after starting dispycosnode); the output should contain Connected message.