7. Tutorial / Examples

Many examples are included in ‘examples’ directory where pycos module is installed (with PyPI / pip). See README file in that ‘examples’ directory for brief description of each of the programs. A few examples from them are explained below in more detail.

7.1. Asynchronous Concurrent Programming

pycos’s concurrency framework has some features similar to Actor Model. With pycos, computation units are created with tasks. Each task has a message queue from which it can receive message sent to it by other tasks. In addition to this one-to-one communication between tasks, pycos supports one-to-many communication with broadcasting channels.

7.1.1. Tasks

Task in pycos is a computational unit created with generator function, i.e., Python function with one or more yield statements. Creating tasks is similar to creating threads, except that the task function must be generator function and task is created with Task instead of threading.Thread. If the generator functions used for creating tasks have default argument task=None, Task constructor sets this parameter set to task instance created. This parameter, thus, can be used to call methods on it (e.g., receive(), sleep() etc.).

An example program that creates tasks is:

import pycos, random, time

def task_proc(n, task=None):
    s = random.uniform(0.5, 3)
    print('%f: task %d sleeping for %f seconds' % (time.time(), n, s))
    yield task.sleep(s)
    print('%f: task %d terminating' % (time.time(), n))

# create 10 tasks running generator function 'task_proc'
for i in range(10):
    # task function is called with 'i'
    pycos.Task(task_proc, i)

Tasks are created with Task constructor. The first argument must be a generator function and rest of the arguments should correspond to parameters in generator definition. In the above program, the generator function task_proc has task=None keyword argument, so Task constructor sets task to the task created - this parameter should not be passed to the constructor. The constructor schedule execution of this task in asntask’s scheduler. In task_proc, the expression task.sleep(s) suspends execution of running task for given time. During that time other tasks that are ready to execute will be executed. The total time taken by the above program should be roughly same as maximum of the sleep times (at most 3 seconds in the above program).

Note that since sleep() is a generator function, it must be called with yield (otherwise, task.sleep simply returns new generator instance and thus will not suspend the execution as desired).

7.1.2. Message Passing

With the concurrent and asychronous behavior of tasks in pycos, communication among them is accomplished by sending and receiving messages. A message can be any Python object. In case message is being sent to a remote task (i.e., task running with another program), the message must be serializable. A task can either send a message to another task (one-to-one communication) or broadcast a message over channel to many tasks (one-to-many communication). At the reeciver task the messages are stored in a queue (similar to what is called Mailbox in other concurrecy frameworks) in the order they are received so that a receive() returns oldest message, blocking until a message becomes available.

With one-to-one communication, a task invokes send() method on the receipient task. Sending a message simply queues the message in recipient; it doesn’t, for example, wait for recipient to process the message. If necessary, deliver() method may be used instead of send() when sending message over network; it indicates how many recipients received the message - see Asynchronous Concurrenct Programming (pycos) for details.

An example client/server program with pycos is:

import pycos, random

def server_proc(task=None):
    task.set_daemon()
    while True:
        msg = yield task.receive()
        print('received %s' % (msg))

msg_id = 0

def client_proc(server, n, task=None):
    global msg_id
    for x in range(3):
        yield task.suspend(random.uniform(0.5, 3))
        msg_id += 1
        server.send('%d: %d / %d' % (msg_id, n, x))

server = pycos.Task(server_proc)
for i in range(10):
    pycos.Task(client_proc, server, i)

The main program first creates server task with server_proc generator function, which has task=None keyword parameter, so Task constructor passes the task instance as task (thus, server in the main program is same as task in server_proc). The main program creates 10 client tasks with client_proc, passing server as the first argument and an identifier as second argument. The main program has no use for client tasks, so it doesn’t save them. Each of the client tasks suspends itself for a brief period and sends a unique message to the server. Since server_proc never terminates on its own, we indicate that it is a daemon task so that pycos can terminate it once all non-daemon tasks (in this case client tasks) are terminated (after sending 3 messages each); otherwise, pycos’s scheduler will never terminate as the server task is still running.

Unlike with threads, pycos’s scheduler doesn’t preempt running task. Thus, locking is not required with pycos. To illustrate this concept, msg_id, a global, shared variable, is updated in client_proc without having to worry about non-deterministic values. pycos, however, provides all locking primitives similar to thread locking primitives. Some of the methods in these locking primitives are generator methods (blocking operations in synchronous threading module), so they must be used with yield.

In this case the messages sent by clients are strings. If, say, server needs to send a reply back tot the client, then the messages can be in the form of dictionary, tuple, list etc. to pass client’s task instance (e.g., as list [task, msg_id, n, x] from which server can retrieve the client task that sent the message).

7.1.3. Channels

If one-to-many or broadcast communication is needed, pycos’s Channel can be used. To receive messages on a channel, a task must subscribe to it. After subscribing to a channel, any message sent to that channel will be received by each of its current subscribers.

These concepts are used in the program below where a client sends a series of numbers over a channel. Two tasks receive these numbers to compute sum and product of those numbers:

import pycos, random

def seqsum(task=None):
    # compute sum of numbers received over channel
    result = 0
    while True:
        msg = yield task.receive()
        if msg is None:
            break
        result += msg
    print('sum: %f' % result)

def seqprod(task=None):
    # compute product of numbers received over channel
    result = 1
    while True:
        msg = yield task.receive()
        if msg is None:
            break
        result *= msg
    print('prod: %f' % result)

def client_proc(task=None):
    channel = pycos.Channel('sum_prod')
    # create two tasks to compute sum and product of
    # numbers sent over the channel
    sum_task = pycos.Task(seqsum)
    prod_task = pycos.Task(seqprod)
    yield channel.subscribe(sum_task)
    yield channel.subscribe(prod_task)
    for x in range(4):
        r = random.uniform(0.5, 3)
        channel.send(r)
        print('sent %f' % r)
    channel.send(None)
    yield channel.unsubscribe(sum_task)
    yield channel.unsubscribe(prod_task)

pycos.Task(client_proc)

A task can subscribe to as many channels as necessary. All such messages, as well as messages sent directly to a task, are received with task.receive() method.

A channel, c2, may subscribe to another channel, c1, so that any message sent to c1 will also be received by all of its subscribers, including c2, which in turn causes its subscribers to receive that message as well. In this case, a message sent to c2 will not be receieved by c1. This way a hierarchy of channels can be created to reflect the heirarchy of components in a system.

Care must be taken not to create cycles in subscription with channel hierarchy; e.g., channel c1 subscribing to channel c2 in the above example. pycos doesn’t detect cycles in subscriptions and will cause runtime exception due to recursion.

7.2. Asynchronous Network Programming

Some of Python library’s (synchronous) socket operations, such as connect, accept and recv are blocking operations; i.e., they wait for the operation complete. These blocking operations are not suitable with pycos, as during that time other eligible tasks are also blocked from executing.

pycos provides Asynchronous Socket class to convert Python’s blocking socket to a non-blocking socket. Essentially Asynchronous Socket is a wrapper that implements blocking operations as generator functions that can be used in tasks (with yield, as done with any generator function).

For example, below is the server program that accepts connections and processes each connection:

import socket, sys, pycos

def process(conn, task=None):
    data = ''
    while True:
        data += yield conn.recv(128)
        if data[-1] == '/':
           break
    conn.close()
    print('received: %s' % data)

def server_proc(host, port, task=None):
    task.set_daemon()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock = pycos.AsyncSocket(sock)
    sock.bind((host, port))
    sock.listen(128)

    while True:
        conn, addr = yield sock.accept()
        pycos.Task(process, conn)

pycos.Task(server_proc, '127.0.0.1', 8010)
while True:
    cmd = sys.stdin.readline().strip().lower()
    if cmd == 'exit' or cmd == 'quit':
        break

The two differences to note in ‘server_proc’ task function compared to programming with threads: the TCP socket is converted to asynchronous socket with Asynchronous Socket so it can be used in tasks, and accept is used with yield as this is a generator function (in AsyncSocket). Then a new task is created to process the connection. The socket returned from accept of an asynchronous socket is also an asychronous socket, so no need to convert it with Asynchronous Socket. In the ‘process’ task function, recv is used with yield as it is also a generator function of asynchronous socket.

Below is a client program that creates 10 tasks each of which connects to the server above and sends a message. Each message ends with a marker ‘/’ so that the server can receive the full message.:

import socket, sys, pycos, random

def client_proc(host, port, n, task=None):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock = pycos.AsyncSocket(sock)
    yield sock.connect((host, port))
    msg = '%d: ' % n + '-' * random.randint(100,300) + '/'
    yield sock.sendall(msg)
    sock.close()

for n in range(1, 10):
    pycos.Task(client_proc, '127.0.0.1', 8010, n)

Here again, the TCP socket is converted to asynchronous socket with Asynchronous Socket so it can be used in tasks, and the operations connect and sendall are used with yield as these are generator functions.

In essence, using pycos for asynchronous network programming is very similar to thread programming, except for creating tasks with Task instead of threads, converting Python library’s sockets to asynchronous sockets with Asynchronous Socket, and using yield with certain methods.

7.3. Distributed Programming

pycos includes a scheduler (class Pycos) that runs tasks, suspends them when necessary, deliver messages etc. When a task is created with Task as done in above programs, the scheduler is started if one has not been already started. The default behavior with the scheduler is to not start network services required for communication with tasks or channels in another program. To use distributed programming, netpycos must be imported instead of pycos.

Tasks and channels can be registered with pycos so that they can be located by tasks running in a remote location. The reference (to remote task or channel) obtained by locating can be used to send messages, monitor (in the case of task) etc.

Using these features, the above client/server program can be separated in to client and server programs that run on two different locations. The server program is:

import random, sys
import pycos.netpycos as pycos

def server_proc(task=None):
    task.set_daemon()
    task.register('server_task')
    while True:
        msg = yield task.receive()
        print('received %s' % (msg))

server = pycos.Task(server_proc)
while True:
    cmd = sys.stdin.readline().strip().lower()
    if cmd == 'quit' or cmd == 'exit':
        break

There are two differences with this version from the one in concurrency section above:

  • netpycos is imported to start network services for distributed programming.
  • The server task registers itself with the name “server_task” so that client can use that name to obtain a reference to the this server which can be used to send messages.

The client program is:

import random
import pycos.netpycos as pycos

def client_proc(n, task=None):
    global msg_id
    server = yield Task.locate('server_task')
    for x in range(3):
        yield task.suspend(random.uniform(0.5, 3))
        msg_id += 1
        server.send('%d: %d / %d' % (msg_id, n, x))

msg_id = 0
for i in range(10):
    pycos.Task(client_proc, i)

In this case there are two differences compared to the version in concurrent programming section above:

  • As is done in the server, netpycos is imported to start network service.
  • In the client task a reference to remote server is obtained using the name the server is registered with. locate() is a generator function so it must be called with yield.

The above client and server programs can be run either on the same computer or on different computers on the same network. Even if they are run on the same computer, the client and server tasks are considered remote to each other. The client program can be run multiple instances simultaneously, if desired.

If the client and server programs are run on computers on the same network (i.e., they share same router or gateway), then the schedulers discover each other. If the programs are on computers on different networks, the scheduler in the client program needs to be informed about the location of server’s scheduler. This is done by adding the line yield Pycos().peer('remote_node') before using name location, where remote_node is either the IP address or name of the remote peer.

7.4. Distributed Communicating Tasks

While RTI (Remote Task Invocation) provides API for creating pre-defined functionality that can be executed remotely, module dispycos provides support for clients to send computations that can be executed remotely, optionally running them in parallel in separate processes to use multiple processors. See Distributed Communicating Tasks (dispycos) for details.

Program below sends rtask_proc generator function to a server running dispycosnode.py program, for creating (remote) tasks to execute compute which simply sleeps for given number of seconds and sends back the same number (as the result). Task client_proc creates dispycos Pycos in the client program itself (alternately, dispycos.py can be run as a separate program to which multiple clients can schedule computations). The same task is also set as status_task before scheduling computation so all status notifications are sent to it as messages, which are processed to know which dispycos server processes are available to schedule new jobs, which jobs are finished etc. Alternately, jobs can simply be schduled to execute and scheduler will load balance tasks; see dispycos_client.py and dispycos_client2.py in ‘examples’ directory under the installation path.:

import pycos.dispycos as dispycos
import pycos.netpycos as pycos

# this generator function is sent to remote server to run
# tasks there
def rtask_proc(n, task=None):
    yield task.sleep(n)
    raise StopIteration(n)

def client_proc(computation, njobs, task=None):
    status = {'submitted': 0, 'done': 0}

    def submit_job(where, task=None):
        arg = random.uniform(5, 20)
        rtask = yield computation.run_at(where, rtask_proc, arg)
        if isinstance(rtask, pycos.Task):
            print('%s processing %s' % (rtask.location, arg))
        else:
            print('Job %s failed: %s' % (status['submitted'], str(rtask)))
        status['submitted'] += 1

    dispycos.Scheduler()
    computation.status_task = task
    if (yield computation.schedule()):
        raise Exception('Failed to schedule computation')
    # job submitter assumes that a process can run at most one task at  a time,
    # although more than one task (many thousands, if necessary) can be run
    while True:
        msg = yield task.receive()
        if isinstance(msg, pycos.MonitorException):
            rtask = msg.args[0]
            if msg.args[1][0] == StopIteration:
                print('Remote task %s finished with %s' % (rtask.location, msg.args[1][1]))
            else:
                pycos.logger.warning('Remote task %s terminated with "%s"' %
                                        (rtask.location, str(msg.args[1])))
             status['done'] += 1
             # because jobs are submitted with 'yield' with tasks,
             # and 'submitted' is incremented after 'yield', it is
             # likely that more than 'njobs' are submitted
             if status['done'] >= njobs and status['done'] == status['submitted']:
                 break
            if status['submitted'] < njobs:
                # schedule another job at this process
                pycos.Task(submit_job, rtask.location)
        elif isinstance(msg, dispycos.StatusMessage):
             # a new process is ready (if special initialization is
             # required for preparing process, schedule it)
            if msg.status == dispycos.Scheduler.ProcInitialized:
                pycos.Task(submit_job, msg.location)
        else:
            pycos.logger.debug('Ignoring status message %s' % msg)
    yield computation.close()

if __name__ == '__main__':
    import logging, random
    pycos.logger.setLevel(logging.DEBUG)
    computation = dispycos.Computation([rtask_proc])
    # run 10 jobs
    pycos.Task(client_proc, computation, 10)

To test, run dispycosnode.py program on a computer in local network and this client program.

If the tasks and client don’t need to communicate (as in the example above), it is easier to use dispy project. If the tasks and client need to communicate, separating scheduler and client would make it easier. See ‘dispycos_client*.py’ files in the examples directory under installation directory of pycos for additional use cases.