Periodically run Bash Job A to conditionally restart Bash Job B based on output of Bash Job A

Posted on

Problem

Is there any neater way I can accomplish the task (as in title)? Also, I’m not sure why asyncio.Task.all_tasks() explodes and whether this is bad or not.

from concurrent.futures import ProcessPoolExecutor
import asyncio
from contextlib import contextmanager
import random


@contextmanager
def event_loop():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(3) as executor:
        loop.set_default_executor(executor)
        yield loop
    loop.close()


async def start_long_proc(delay=0, *, started_event):
    await asyncio.sleep(delay)
    cmd = 'while true; do date >> temp.log; sleep 0.1; done;'
    proc = await asyncio.create_subprocess_shell(cmd)
    started_event.set()
    return proc


async def periodic_monitor():
    while True:
        n = random.randrange(100)
        cmd = f'echo "{n}"'
        proc = await asyncio.create_subprocess_shell(
            cmd, stdout=asyncio.subprocess.PIPE)
        stdout, stderr = await proc.communicate()
        yield int(stdout.decode().rstrip())


async def f():
    started_event = asyncio.Event()

    # run Bash Job B
    long_proc = await start_long_proc(started_event=started_event)

    p_terminate = 10
    monitor_interval = 1

    async for n in periodic_monitor():  # periodically run Bash Job A
        print(f'n={n}')
        if n < p_terminate:
            print('terminated and reschedule long proc')
            long_proc.terminate()  # kill Bash Job B
            started_event.clear()
            long_proc = await start_long_proc(5, started_event=started_event)
        await asyncio.sleep(monitor_interval)


if __name__ == '__main__':
    with event_loop() as loop:
        asyncio.ensure_future(f())
        loop.run_forever()

Solution

Is there any neater way I can accomplish the task?

        long_proc.terminate()  # kill Bash Job B

Well, sending SIGTERM is effective, but since you’re in control of what Job B runs you might prefer to have it do IPC so it gracefully shuts down after completing a processing phase, rather than being interrupted at some arbitrary point. Could be as simple as creating a sentinel /tmp file that B looks for.

A bigger issue is that you’re not doing wait4() child, like this:

        long_proc.terminate()
        long_proc.wait()

Instead you wrote a race with a 5-second delay to encourage the child to win. That long sleep implies wasted idle time. You could run the CPU flat out by synchronizing with exiting child.

Leave a Reply

Your email address will not be published. Required fields are marked *