Pyjo.IOLoop - Minimalistic event loop

import Pyjo.IOLoop

# Listen on port 3000
@Pyjo.IOLoop.server(port=3000)
def server(loop, stream, cid):

    @stream.on
    def read(stream, chunk):
        # Process input chunk
        print("Server: {0}".format(chunk.decode('utf-8')))

        # Write response
        stream.write(b"HTTP/1.1 200 OK\x0d\x0a\x0d\x0a")

        # Disconnect client
        stream.close_gracefully()

# Connect to port 3000
@Pyjo.IOLoop.client(port=3000)
def client(loop, err, stream):

    @stream.on
    def read(stream, chunk):
        # Process input
        print("Client: {0}".format(chunk.decode('utf-8')))

    # Write request
    stream.write(b"GET / HTTP/1.1\x0d\x0a\x0d\x0a")

# Add a timer
@Pyjo.IOLoop.timer(3)
def timeouter(loop):
    print("Timeout")
    # Shutdown server
    loop.remove(server)

# Start event loop if necessary
if not Pyjo.IOLoop.is_running():
    Pyjo.IOLoop.start()

Pyjo.IOLoop is a very minimalistic event loop based on Pyjo.Reactor, it has been reduced to the absolute minimal feature set required to build solid and scalable non-blocking TCP clients and servers.

Events

Pyjo.IOLoop inherits all events from Pyjo.EventEmitter and can emit the following new ones.

finish

@loop.on
def finish(loop):
    ...

Emitted when the event loop wants to shut down gracefully and is just waiting for all existing connections to be closed.

Debugging

You can set the PYJO_IOLOOP_DEBUG environment variable to get some advanced diagnostics information printed to sys.stderr.

PYJO_IOLOOP_DEBUG=1

Classes

exception Pyjo.IOLoop.Error

Exception raised on unhandled error event.

class Pyjo.IOLoop.Pyjo_IOLoop(**kwargs)

Pyjo.IOLoop inherits all attributes and methods from Pyjo.EventEmitter and implements the following new ones.

acceptor(acceptor)
server = Pyjo.IOLoop.acceptor(cid)
server = loop.acceptor(cid)
cid = loop.acceptor(Pyjo.IOLoop.Server.new())

Get Pyjo.IOLoop.Server object for id or turn object into an acceptor.

client(cb=None, **kwargs)
cid = Pyjo.IOLoop.client(cb, address='127.0.0.1', port=3000)

@Pyjo.IOLoop.client(address='127.0.0.1', port=3000)
def cid(loop, err, stream):
    ...

cid = loop.client(cb, address='127.0.0.1', port=3000)

@loop.client(cb, address='127.0.0.1', port=3000)
def cid(loop, err, stream):
    ...

Open TCP connection with Pyjo.IOLoop.Client, takes the same arguments as Pyjo.IOLoop.Client.connect().

delay(*args)
delay = Pyjo.IOLoop.delay()
delay = loop.delay()
delay = loop.delay(cb)
delay = loop.delay(cb1, cb2)

Build Pyjo.IOLoop.Delay object to manage callbacks and control the flow of events for this event loop, which can help you avoid deep nested closures and memory leaks that often result from continuation-passing style. Callbacks will be passed along to Pyjo.IOLoop.Delay.steps().

# Synchronize multiple events
delay = Pyjo.IOLoop.delay()

@delay.step
def step(delay):
    print('BOOM!')

for i in range(10):
    end = delay.begin()

    def timer_wrap(i):
        def timer_cb(loop):
            print(10 - i)
            end()
        return timer_cb

    Pyjo.IOLoop.timer(timer_wrap(i), i)

delay.wait()

# Sequentialize multiple events
delay = Pyjo.IOLoop.delay()

# First step (simple timer)
@delay.step
def step1(delay):
    Pyjo.IOLoop.timer(delay.begin(), 2)
    print('Second step in 2 seconds.')

# Second step (concurrent timers)
@delay.step
def step2(delay):
    Pyjo.IOLoop.timer(delay.begin(), 1)
    Pyjo.IOLoop.timer(delay.begin(), 3)
    print('Third step in 3 seconds.')

# Third step (the end)
@delay.step
def step3(delay):
    print('And done after 5 seconds total.')

delay.wait()

# Handle exceptions in all steps
delay = Pyjo.IOLoop.delay()

@delay.step
def step1(delay):
    raise Exception('Intentional error')

@delay.step
def step2(delay, *args):
    say('Never actually reached.')

@delay.catch
def catch(delay, err):
    print("Something went wrong: " + err)

delay.wait()
is_running
boolean = Pyjo.IOLoop.is_running()
boolean = loop.is_running

Check if event loop is running.

if not Pyjo.IOLoop.is_running():
    Pyjo.IOLoop.start()
max_accepts = None
max_accepts = loop.max_accepts
loop.max_accepts = 1000

The maximum number of connections this event loop is allowed to accept before shutting down gracefully without interrupting existing connections, defaults to 0. Setting the value to 0 will allow this event loop to accept new connections indefinitely. Note that up to half of this value can be subtracted randomly to improve load balancing between multiple server processes.

max_connections = None
max_connections = loop.max_connections
loop.max_connections = 1000

The maximum number of concurrent connections this event loop is allowed to handle before stopping to accept new incoming connections, defaults to 1000.

multi_accept = None
multi = loop.multi_accept
loop.multi_accept = 100

Number of connections to accept at once, defaults to 50 or 1, depending on if the value of max_connections is smaller than 50.

next_tick(*args)
Pyjo.IOLoop.next_tick(cb)
loop.next_tick(cb)

Invoke callback as soon as possible, but not before returning, always returns None.

# Perform operation on next reactor tick
@Pyjo.IOLoop.next_tick
def do_something(loop):
    ...
one_tick()
Pyjo.IOLoop.one_tick()
loop.one_tick()

Run event loop until an event occurs. Note that this method can recurse back into the reactor, so you need to be careful.

# Don't block longer than 0.5 seconds
tid = Pyjo.IOLoop.timer(lambda loop: None, 0.5)
Pyjo.IOLoop.one_tick()
Pyjo.IOLoop.remove(tid)
reactor = None
reactor = loop.reactor
loop.reactor = Pyjo.Reactor.new()

Low-level event reactor, usually a Pyjo.Reactor.Poll or Pyjo.Reactor.Select object with a default subscriber to the event error.

# Watch if handle becomes readable or writable
def io_cb(reactor, writable):
    if writable:
        print('Handle is writable')
    else:
        print('Handle is readable')

loop.reactor.io(io_cb, handle)

# Change to watching only if handle becomes writable
loop.reactor.watch(handle, read=False, write=True)

# Remove handle again
loop.reactor.remove(handle)
recurring(*args)
tid = Pyjo.IOLoop.recurring(cb, 3)
tid = loop.recurring(cb, 0)
tid = loop.recurring(cb, 0.25)

Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds.

@Pyjo.IOLoop.recurring(5)
def do_something(loop):
    ...
remove(taskid)
Pyjo.IOLoop.remove(taskid)
loop.remove(taskid)

Remove anything with an id, connections will be dropped gracefully by allowing them to finish writing all data in their write buffers.

reset()
Pyjo.IOLoop.reset()
loop.reset()

Remove everything and stop the event loop.

server(cb=None, **kwargs)
cid = Pyjo.IOLoop.server(cb, port=3000)

@Pyjo.IOLoop.server(port=3000)
def server(loop, stream, cid):
    ...

cid = loop.server(cb, port=3000)

@loop.server(port=3000)
def server(loop, stream, cid):
    ...

Accept TCP connections with Pyjo.IOLoop.Server, takes the same arguments as Pyjo.IOLoop.Server.listen().

# Listen on port 3000
@Pyjo.IOLoop.server(port=3000)
def server(loop, stream, cid):
    ...

# Listen on random port
@Pyjo.IOLoop.server(address='127.0.0.1')
def server(loop, stream, cid):
    ...

port = Pyjo.IOLoop.acceptor(server).port
start()
Pyjo.IOLoop.start()
loop.start()

Start the event loop, this will block until stop() is called. Note that some reactors stop automatically if there are no events being watched anymore.

# Start event loop only if it is not running already
if not Pyjo.IOLoop.is_running():
    Pyjo.IOLoop.start()
stop()
Pyjo.IOLoop.stop()
loop.stop()

Stop the event loop, this will not interrupt any existing connections and the event loop can be restarted by running start() again.

stop_gracefully()
Pyjo.IOLoop.stop_gracefully()
loop.stop_gracefully()

Stop accepting new connections and wait for all existing connections to be closed before stopping the event loop.

stream(stream)
stream = Pyjo.IOLoop.stream(cid)
stream = loop.stream(cid)
cid = loop.stream(Pyjo.IOLoop.Stream.new())

Get Pyjo.IOLoop.Stream object for id or turn object into a connection.

# Increase inactivity timeout for connection to 300 seconds
Pyjo.IOLoop.stream(cid).timeout = 300
timer(*args)
tid = Pyjo.IOLoop.timer(cb, 3)
tid = loop.timer(cb, 0)
tid = loop.timer(cb, 0.25)

Create a new timer, invoking the callback after a given amount of time in seconds.

# Perform operation in 5 seconds
@Pyjo.IOLoop.timer(5)
def timer_cb(loop):
    ...
Pyjo.IOLoop.object

alias of Pyjo_IOLoop

Pyjo.IOLoop.singleton = <Pyjo.IOLoop.Pyjo_IOLoop object>
loop = Pyjo.IOLoop.singleton

The global Pyjo.IOLoop singleton, used to access a single shared event loop object from everywhere inside the process.

# Many methods also allow you to take shortcuts
Pyjo.IOLoop.timer(lambda loop: Pyjo.IOLoop.stop(), 2)
Pyjo.IOLoop.start()

# Restart active timer
@Pyjo.IOLoop.timer(3)
def timeouter(loop):
    print('Timeout!')

Pyjo.IOLoop.singleton.reactor.again(timeouter)