2. Asynchronous Concurrenct Programming (pycos)

pycos provides API for asynchronous and concurrent programming with tasks using Python’s generator functions. Tasks are like light weight threads - creating and running tasks is very efficient. Moreover, unlike in the case of thread programming, a task continues to run until it voluntarily gives up control (when yield is used), so locking is not needed to protect critical sections.

Programs developed with pycos have same logic and structure as programs with threads, except for a few syntactic changes. Although the API below has many methods, most of them are for additional features of pycos (such as message passing, hot swapping, monitoring etc.), and not needed for simple programs that are similar to thread based programs. The differences compared to threaded programming are:

  • Instead of creating threads, tasks should be created with Task. The task function (first argument to Task) should be a generator function (i.e., function with yield statements),

  • Sockets, pipes etc, should be converted to asynchronous versions with Asynchronous Socket, Asynchronous Pipe etc. (e.g., with async_sock = AsyncSocket(socket.socket())

  • pycos’s locking primitives (pycos.Event, pycos.Condition, etc.) should be used in place of Python’s threading counterparts with yield on blocking operations (e.g., as yield async_event.wait()),

  • I/O operations, such as AsyncSocket’s send(), receive(), accept(), blocking operations, such as task’s sleep(), Event’s wait(), etc., are implemented with generator methods; these should be used with yield (e.g., as data = yield async_sock.receve(1024)),

  • Task’s sleep() should be used in place of time.sleep() (e.g., as yield task.sleep(2)).

Tasks in pycos are essentially generator functions that suspend execution when yield is used and are resumed by pycos’s scheduler (Pycos) after the asynchronous operation is complete. Usually yield is used with an asynchronous call, such as socket’s connect(), send() or pipe’s read(), communicate(), waiting for a message etc. With such statements, the asynchronous call is initiated and control goes to scheduler which schedules another task, if one is ready to execute. When the asynchronous operation is complete, the task that called the operation becomes ready to execute. Thus, the tasks in pycos are not strictly cooperative tasks that pass control to each other, but each yield statement transfers control to pycos’s scheduler, which manages them. However, pycos supports message passing, suspend/resume calls etc., so that tasks can cooperate in a way that is easier to program and understand.

Unlike with threads, there is no forced preemption with tasks - at any time at most one task is executing and it continues to execute until yield is called. Thus, there is no need for locking critical sections with pycos.

pycos framework consists of Pycos scheduler, Task to create tasks from generator functions, Channel to broadcast messages, Asynchronous Socket to convert regular synchronous sockets to asynchronous (non-blocking) sockets, Asynchronous Pipe for pipes, Asynchronous File for files, Client and dispycos_server() for distributed / parallel computing, Lock and RLock for locking (although locking is not required with pycos), Condition, Event, Semaphore primitives very similar to thread primitives (except for a few syntactic changes noted above).

2.1. Examples

See Asynchronous Concurrent Programming, Channels and Message Passing in tutorial for examples. There are many illustrative use cases in ‘examples’ directory under where pycos module is installed.

Following is a brief description of the examples included relevant to this section:

  • examples/tasks.py creates a number of tasks that each suspend execution for a brief period. The number of tasks created can be increased to thousands or tens of thousands to show pycos can scale well.

  • examples/client_server.py shows message passing (send() and receive() methods of tasks) between local client and server tasks. The remote version and local version are similar, except that remote versions register/locate tasks.

  • examples/channel.py uses broadcasting Channel to exchange messages in local tasks.

2.2. Confituration Parameters

Under pycos installation config.py file defines following configuration parameters:

  • PickleProtocolVersion is pickle (serialization) protocol version. Its default value of None implies pycos uses highest protocol version supported by Python. If it is 0, default protocol version is used. Otherwise, it should be a number that is protocol version. This setting should be same among all peers. This variable affects dispy as well.

  • MsgTimeout is maximum duration in seconds for sending a message (with send(), deliver()). If a message is not delivered for this long, it is considered an error. If more than MaxConnectionErrors errors happen in sending messages to a peer, that peer is considered not reachable and is closed.

  • MaxConnectionErrors is maximum number errors allowed when communicating with a peer. See MsgTimeout parameter above.

  • IPV4_MULTICAST_GROUP is multicast group used by netpycos when using IPv4. Its default value is 239.255.97.5.

  • IPV6_MULTICAST_GROUP is multicast group used by netpycos when using IPv6. Its default value is ff05::674f:48ba:b409:3171:9705.

  • NetPort is port used by netpycos for network communication. Its default value is 9705.

  • MinPulseInterval is minimum interval in seconds for sending pulse messages between dispycos client, schedler and dispycosnode. Pulse messages are used for checking that peers are rechable. Its default value is MsgTimeout defined above.

  • MaxPulseInterval is maximum interval in seconds for sending pulse messages between dispycos client, scheduler and dispycosnode. Its default value is 10 * MinPulseInterval.

  • DispycosSchedulerPort is port number used by dispycos scheduler. This must be an expression (to be evaluated with eval). Its default value is 'pycos.config.NetPort', so by default 9705 is the port used by dispycos scheduler.

  • DispycosNodePort is port number used by dispycosnode. This must be an expression (to be evaluated with eval). Its default value is 'pycos.config.DispycosSchedulerPort + 1', so by default 9706 is the port used by dispycosnode.

2.3. Pycos scheduler

Pycos is a (singleton) scheduler that runs tasks similar to the way operating system’s scheduler runs multiple processes. It is initialized automatically (for example, when a task is created), so for most purposes the scheduler is transparent. The scheduler in pycos manages tasks, message passing, I/O events, timeouts, wait/resume events etc., in a single concurrent program; it doesn’t provide distributed programming for message passing over network. netpycos extends Pycos with features supporting distributed programming, remote execution of tasks etc. If the scheduler instance is needed, it can be obtained with either Pycos() or Pycos.instance().

Unlike in other asynchronous frameworks, in pycos there is no explicit event loop - the I/O events are processed by the scheduler and methods in Asynchronous Socket, Asynchronous Pipe etc. For example, recv() method (which must be used with yield) sets up an internal function to execute when the socket has data to read and suspends the caller task. The scheduler can execute any other tasks that are ready while the I/O operation is pending. When the data has been read, the suspended task is resumed with the data read so that Asynchronous Socket's recv() works just as socket.recv(), except for using yield. Thus, programming with pycos is very similar to that with threads, except for using yield with certain methods.

class pycos.Pycos

Creates and returns singleton scheduler. If a scheduler instance has already been created (such as when a task was created), a new instance won’t be created. netpycos extends Pycos for distributed programming and the constructor there has various options to customize.

The scheduler following methods:

instance()

This static method returns instance of Pycos scheduler; use it as scheduler = Pycos.instance(). If the instance has not been started (yet), it creates one and returns it.

cur_task()

This static method returns task (instance of Task) being executed; use it as task = Pycos.cur_task(). As mentioned below, if task’s generator function has task=None parameter, Task constructor initializes it to the task instance (which is a way to document that method is used for creating tasks).

join()

Note

This method must be called from (main) thread only - calling from a task will deadlock entire task framework.

Waits for all scheduled non-daemon tasks to finish. After join returns, more tasks can be created (which are then added to scheduler).

finish()

Note

This method must be called from (main) thread only - calling from a task will deadlock entire task framework.

Wait for all non-daemon tasks to finish, then kill the daemon tasks and terminate the task scheduler. If necessary, a new scheduler instance may be created with Pycos() or Pycos.instance().

terminate()

Note

This method must be called from (main) thread only - calling from a task will deadlock entire task framework.

Kill all scheduled tasks and then terminate the task scheduler. If necessary, a new scheduler instance may be created with Pycos() or Pycos.instance().

The scheduler runs in a separate thread from user program. The scheduler terminates when all non-daemon tasks are terminated, similar to Python’s threading module.

2.4. Task

pycos’s Task class creates tasks (light weight processes). Tasks are similar to threads in regular Python programs, except for a few differences as noted above.

class pycos.Task(target[, arg1, arg2, ...])

Creates a task, where target is a generator function (a function with yield statements), arg1, arg2 etc. are arguments or keyword arguments to target. If target generator function has task=None keyword argument, Task constructor replaces None with the instance of Task created, so task can use this to invoke methods in Task class (see below). Alternately, the instance can be obtained with the static method task = Pycos.cur_task().

Consider the generator function (where sock is asynchronous socket and all statements are asynchronous, so all are used with yield):

def get_reply(sock, msg, task=None):
    yield sock.sendall(msg)
    yield task.sleep(1)
    reply = yield sock.recv(1024)
    return reply

A task for processing above function can be created with, for example, Task(get_reply, conn, "ping"). Task constructor creates task with the (generator) function get_reply with parameters sock=conn, msg="ping" and task set to the just created task instance. (If task=None argument is not used, the task instance can be obtained with task = Pycos.cur_task().) The task is then added to Pycos scheduler so it executes concurrently with other tasks - there is no need to start it explicitly, as done with threads. Note that prior to Python 3.7+, generator functions cannot use return statement. With Python version before 3.7 a return statement such as return v should be replaced with raise StopIteration(v). Thus, with Python 3.6 or Python 2.7, in the above example the last statement should be raise StopIteration(reply).

Blocking operations, such as socket.recv(), socket.connect(), are implemented as generator functions in asynchronous implementation of socket Asynchronous Socket. These functions simply initiate the operation; yield should be used with them (as in the example above) so scheduler can run other eligible tasks while the operation is pending. Calling these methods without yield simply returns generator function itself, instead of result of the method call. So care must be taken to use yield when calling generator functions. Using yield where it is not needed is not an error; e.g., resume() method of tasks can be used without yield, but when used with yield, the caller gives control to scheduler which may execute resumed task right away. In rest of the documentation, methods that need to be called with yield are noted so.

In rest of the documentation we follow the convention of using task=None keyword argument in generator methods and use task variable to refer to the task, i.e., instance of Task, executing the generator function. This variable can be used to invoke methods of Task, use it in other tasks, for example, to send messages to it, or wake up from sleep etc. A task has following methods:

task.suspend(timeout=None, alarm_value=None)
task.sleep(timeout=None, alarm_value=None)

Note

This method must always be used with yield as yield task.sleep().

Suspends task task until timeout. If timeout is a positive number (float or int), the scheduler suspends execution of task until that many seconds (or fractions of second). If timeout is None, the task is not woken up by the scheduler - some other task needs to resume it. The value yielded by this method is the value it is resumed with or alarm_value if resumed by the scheduler due to timeout. If timeout=0, this method returns alarm_value without suspending the task.

For example, if a task executes v = yield task.sleep(2.9), it is suspended for 2.9 seconds. If before timeout, another task wakes up this task (with resume() method) with a value, v is set to that value. Otherwise, after 2.9 seconds, this task is resumed with None (default alarm_value) so v is set to None and task continues execution. During the time task is suspended, scheduler executes other scheduled tasks. Within a task task.sleep (or task.suspend) must be used (with yield) instead of time.sleep; calling time.sleep will deadlock entire pycos framework.

other.resume(update=None)
other.wakeup(update=None)

Wakes up (suspended) task other. As explained above, the suspended task gets update (any Python object) as the value of yield statement that caused it to suspend. If sleep/resume synchronization is needed (so that resume waits until specific suspend is ready to receive), Event locking primitive can be used so that resuming task waits on an event variable and suspending task sets the event before going to sleep.

other.send(msg)

Sends message msg to the task other on which this method is invoked. If the task is currently waiting for messages (with receive()), then it is resumed with msg. If it is not currently waiting for messages, the message is queued so that next receive() returns the message without suspending.

Message can be any Python object when sender and recipient are in same program/pycos (i.e., messages are not sent over network). However, when sender and recipient are on different pycos instances (over network), the messages must be serializable at the sender and unserializable at the receiver. If message includes any objects that have unserializable attributes, then their classes have to provide __getstate__() method to serialize the objects, and the remote program should have __setstate__() for those classes; see Pickle protocol.

If the recipient is in a remote pycos, send() simply queues messages for transfer over network. A daemon task in pycos transfers the messages in the order they are queued. This task, by default, may transfer each message with a new connection. As creating sockets and making connections is expensive, it may be rather inefficient, especially if messages are sent frequently. See peer() method in Distributed Programming (netpycos) for specifying that messages to peers should be sent as stream, using same connection.

other.deliver(msg, timeout=None)

Note

This method must always be used with yield as recvd = yield other.deliver(msg).

Similar to send() except that this method must be used with yield and it returns status of delivering the message to the recipient. If it is 1, the message has been successfully placed in recipient task’s message queue (when recipient calls receive(), it gets the queued messages in the order they are put in the queue). If timeout is given and message couldn’t be delivered before timeout, the return value is 0. If timeout is None, delivery will not timeout.

For local tasks (i.e., tasks executing in the same program) timeout has no effect - if the recipient is valid, message will be delivered successfully right away. However, if the recipient is a remote task (see Distributed Programming (netpycos)), network delays / failures may cause delivery to be delayed or delivery may fail (i.e., there is a possibility of delivery waiting forever); to avoid such issues, appropriate timeout may be used. If message couldn’t be put in the recipient’s queue within given timeout, then the return value would be 0.

task.receive(timeout=None, alarm_value=None)
task.recv(timeout=None, alarm_value=None)

Note

This method must always be used with yield as msg = yield task.receive().

Returns earliest queued message if there are pending messages, or suspends task until either a message is sent to it or timeout seconds elapse. If called with timeout=0, this method will not suspend the task; it will return either earliest queued message if there are messages in the queue or alarm_value otherwise.

recv is synonym for receive.

task.set_daemon(flag=True)

Marks the task a daemon (task that may not terminate) if flag is True. Similar to threading module, Pycos scheduler waits for all non-daemon tasks to terminate before exiting. The daemon status can be toggled by calling set_daemon() with flag set to True or False. When scheduler is terminating, daemon tasks are terminated by throwing GeneratorExit exception; the daemon task can handle it and do any cleanup before exiting the function.

task.hot_swappable(flag)

Marks if the task’s generator function can be replaced. This method can be used to set (with flag=True) or clear (with flag=False) the flag. With hot swapping, a task’s code can be updated (to new functionality) while the application is running.

task.hot_swap(target[, arg1, arg2, ...])

Requests Pycos to replace task’s generator function with target([arg1, arg2, …]). Pycos then throws HotSwapException in the task when:

  • task indicated it can handle hot swap (i.e., last called hot_swappable with flag=True),

  • it is currently executing at top-level in the call stack (i.e., has not called other generator functions), and

  • has no pending asynchronous operations (socket I/O, tasks scheduled with AsyncThreadPool, etc.).

The new generator is set as args[0] of HotSwapException, so the task can inspect new generator, if necessary, and can do any preparation for hot swapping, e.g., saving state (perhaps by sending state as a message to itself which can be retrieved in the new generator with receive()), or even ignore hot swap request. If/when it is ready for swap, it must re-raise the same HotSwapException (with the new generator as args[0]). This causes Pycos to close current generator function, replace it with the new generator function and schedule new generator for execution (from the beginning). Any messages (i.e., resume updates) queued in the previous generator are not reset, so new generator can process queued messages (e.g., use receive() in a loop with timeout=0 until receive() returns alarm_value). Note that hot_swap() changes generator function of a particular task for which it is called. If there are many tasks using that generator function, hot_swap() may be called for each such task.

task.monitor(observe)

Note

This method must always be used with yield as v = yield task.monitor(observe).

Sets task as the monitor of task observe. Then, when the task observe is finished (either because task’s generator function finished exceution or was terminated by Pycos because of an uncaught exception), Pycos sends the status as message with MonitorStatus to task.

A task can be monitored by more than one monitor, and a monitor can monitor more than one task.

other.throw(*args)

Throws exception *args to task other (at the point where it is currently executing).

other.terminate()

Terminates the task other. This is useful, for example, to terminate server tasks that otherwise never terminate.

other()
other.finish()

Note

This method must always be used in a task with yield as v = yield other() or v = yield other.finish().

Returns the result of task other; if other has not terminated yet, caller will wait until other terminates.

This method can also be called on remote tasks directly received from dispycos scheduler or from RPS (Remote Pico/Pycos Service). However, this method should not be used if such tasks are then sent to other remote peers.

If a task fails, the result is an instance of MonitorStatus. This is useful to detect and diagnose faults.

task.value()

Note

This method must be called from a thread, not a task.

Returns the result of task, possibly waiting until task terminates. This method should not be called from a task - this will cause entire task framework to deadlock. This method is meant for main thread in the user program to wait for (main) task(s) it creates.

If a task fails, the result is an instance of MonitorStatus. This is useful to detect and diagnose faults.

2.5. MonitorStatus

MonitorStatus is used to indicate either exit status / value of task being monitored or an exception. If it is about exit status, then the MonitorStatus object’s info attribute refers to the task (being monitored), type atribute refers to type of exit value (e.g., StopIteration if task exited without exceptions) and value attribute is the exit value of task. If status is for an exception (e.g., a task couldn’t be started due to invalid arguments), info attribute would be contextual information as a string (e.g., “invalid arguments to task”), type would be type of exception and value would be traceback as a string.

2.6. Locking Primitives

class pycos.Lock
class pycos.RLock
class pycos.Semaphore
class pycos.Event
class pycos.Condition

Note

With pycos locking is not needed, as there is no forced preemption - at any time at most one task is executing and the control is transfered to the scheduler only when yield statement is encountered. (In fact, the implementation of asynchronous locking primitives in pycos updates lists and counters without locking.) So with pycos Lock and RLock are optional.

pycos provides asynchronous implementations of Lock, RLock, Semaphore, Event and Condition primitives. They are similar to versions in threading module. Any operation that would block in threading module must be called with yield appropriately. For example, acquiring a lock is a blocking operation, so it should be invoked as yield lock.acquire(). Similarly, Event’s wait method or Condition’s wait method must be used as yield event.wait() or yield condition.wait(). For example, Condition variable cv in a client should be used as (compare to example at threading module):

while True:
  yield cv.acquire()
  while not an_item_is_available():
      yield cv.wait()
  get_an_available_item()
  cv.release()

See documentation strings in pycos module for more details on which methods should be used with yield and which methods need not be.

2.7. Channel

Channel is a broadcast mechanism with which tasks can exchange messages. Messages sent to Channel are sent to its subscribers (recipients). While a message can be sent one-to-one with task’s send() or deliver() methods on the receiving task, channels can be used to broadcast a message so all its subscribers get that message.

class pycos.Channel(name, transform=None)

Creates channel with name, which must be unique. If transform, is given, it must a function that is called before a message is sent to subscribers. The function is called with name of the channel and the message. It should return transformed message or None. If None is returned, the message is dropped - subscribers will not receive the message. Otherwise, transformed message is sent to subscribers.

A channel has following methods.

subsribe(subscriber, timeout=None)

Note

This method must be used with yield as yield channel.subscribe(task)

Subscribees subscriber (a task or even another channel) to the channel. Any messages sent to the channel are then sent to each subscriber; i.e., messages are broadcast to all subscribers. It is possible to chain or create hierarchical channels with channels subscribing to other channels. If timeout is a positive number, the call fails if subscription is not successfull (e.g., the channel couldn’t be located) before that many seconds.

send(message)

Calls transform function of the channel (see above) if it has one. If the function returns None, the message is ignored. Otherwise the message is sent to current subscribers. Messages sent over a channel are queued (buffered) at receiving tasks. A task task, for example, that has subscribed to the channel can receive messages with msg = yield task.receive().

deliver(message, timeout=None, n=0)

Note

This method must be used with yield as recvd = yield channel.deliver(msg)

Similar to send(), except that it waits until at least n subscribers are subscribed. It returns total number of end-point recipients (tasks) the message has been delivered to - in case of heirarchical channels, it is the sum of recipients of all the channels. This may be less than n (e.g., delivering message to a subscriber may fail, due to network error), or more (e.g., there are more subscribers, or some subscribers are channels with more than one subscriber). If n is 0, then the message will be delivered to all current subscribers. In any case, timeout is maximum amount of time in seconds (or fraction of second) for delivering the message. Thus, for example, if timeout occurs before n subscribers are subscribed to the channel, the method returns 0.

unsubsribe(subscriber, timeout=None)

Note

This method must be used with yield as yield channel.unsubscribe(task)

Unsubscribes the subscriber (task or another channel), so future messages to the channel are not sent to that subscriber. If timeout is a positive number, it is the number of seconds for unsubscribe request to complete.

close()

Close the channel. The channel can’t be used for message passing after closing.

set_transform(transform)

Set/change transform as the method to call when message is sent to this channel. See Channel constructor and send().

2.8. Message Passing

Task’s send(), receive() and deliver() offer one-to-one message passing and Channel’s send() and deliver() offer one-to-many / broadcast message passing.

pycos delivers messages in the order they have been sent with either one-to-one or broadcast message passing (i.e., with either send or deliver methods of tasks or channels); i.e., pycos guarantees temporal order of messages.

2.9. AsyncThreadPool

pycos framework and all tasks run in a single thread. It implements concurrency (running more than one task) by interleaving tasks - suspending a task that is waiting for some event and running a task that is ready to execute. All the blocking operations, such as sending/receiving data (sockets, message passing), or sleeping, are implemented with generator funtions that schedule the operation and suspend the task. However, pycos framework doesn’t implement every blocking operation. Sometimes, it is necessary to use functions in other modules that block the thread until the operation is complete. For example, reading standard input will block the thread until the read method is complete. If such functions are used in a task, entire pycos framework and all tasks are blocked; i.e., pycos scheduler itself is blocked, so even if there are other tasks eligible to run, they won’t be executed. AsyncThreadPool class can be used to run such blocking functtions in separate threads so pycos itself is not affected by them.

class pycos.AsyncThreadPool(num_threads)

Creates a pool with given number of threads. When a blocking function is scheduled, an available thread in the pool is used to execute that function. More threads will allow more blocking functions to be running simultaneously, but take more system resources.

async_task(target, \*args, \*\*kwargs)

Note

This method must be used with yield as val = yield pool.async_task(target, args, kwargs)

Schedules given target function with arguments *args and keyword arguments **kwargs for execution with a thread in the pool. If all threads are currently executing other functions, then the function will be executed when a thread becomes available (i.e., done with currently executing function).

The value returned by this method is the value returned by the function.

join()

Waits for all scheduled blocking functions to finish. This method should be called from main thread, not from any task, as this method is a blocking operation.

terminate()

Waits for all scheduled blocking functions to finish and then terminate the threads; the pool can no longer be used for scheduling tasks. This method should be called from main thread, not from any task, as this method is a blocking operation.

See examples/chat_client.py which uses thread pool (with 1 thread) to execute sys.stdin.readline (a bloking function).