Streaming subprocess stdin and stdout with asyncio in Python

I keep finding myself facing this problem where I want to run an external process in Python. There are a few ways to do it in the standard library:

  • subprocess
  • os.system
  • os.spawn
  • os.popen
  • popen2
  • commands

All of these are deprecated except for subprocess.

As an example, this will fork and run a subprocess, (stderr and stdout are the same as the parent processes so they will just print to your terminal), and capture the return value.

import subprocess  
status = subprocess.call("mycmd" + " myarg", shell=True)  

OK, great, but what if I want access to stdout and stderr?

Subprocess still has you covered:

proc = subprocess.Popen(['mycmd', "myarg"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)  
(stdout, stderr) = proc.communicate()
# process will block, and then stdout and stderr will have process stdout and stderr

OK, that's all great, but what if I want to stream stdout and stderr? This will give continuous feedback as the process runs, or allow me to stream what's going on with the command through a network socket.

This is where things get complicated. As far as I can tell, you can stream stdout, and you can combine stdout and stderr together and stream that, but if you want to stream both stdout and stderr, while still keeping them separate, you are trying to do 2 things at once.

Well, traditionally, if you want to do 2 things at the same time in Python, you could use threads or processes. Processes won't work here since I would have to run the same command twice. Threads work, and that's what libraries like sarge seem to use.

But this is 2016, and we have asyncio.subprocess and python3.5 with async/await syntax.

import asyncio

async def _read_stream(stream, cb):  
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break

async def _stream_subprocess(cmd, stdout_cb, stderr_cb):  
    process = await asyncio.create_subprocess_exec(*cmd,
            stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)

    await asyncio.wait([
        _read_stream(process.stdout, stdout_cb),
        _read_stream(process.stderr, stderr_cb)
    ])
    return await process.wait()


def execute(cmd, stdout_cb, stderr_cb):  
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(
        _stream_subprocess(
            cmd,
            stdout_cb,
            stderr_cb,
    ))
    loop.close()
    return rc

if __name__ == '__main__':  
    print(execute(
        ["bash", "-c", "echo stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done"],
        lambda x: print("STDOUT: %s" % x),
        lambda x: print("STDERR: %s" % x),
    ))

OK, yeah, that's a lot of code. But it streams! And it all runs in one thread. You call it with 2 callbacks, one that gets run when stdout happens, and one that gets run when stderr happens. The whole thing returns the return code of the subprocess.