Problem
Is there any neater way I can accomplish the task (as in title)? Also, I’m not sure why asyncio.Task.all_tasks()
explodes and whether this is bad or not.
from concurrent.futures import ProcessPoolExecutor
import asyncio
from contextlib import contextmanager
import random
@contextmanager
def event_loop():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(3) as executor:
loop.set_default_executor(executor)
yield loop
loop.close()
async def start_long_proc(delay=0, *, started_event):
await asyncio.sleep(delay)
cmd = 'while true; do date >> temp.log; sleep 0.1; done;'
proc = await asyncio.create_subprocess_shell(cmd)
started_event.set()
return proc
async def periodic_monitor():
while True:
n = random.randrange(100)
cmd = f'echo "{n}"'
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
yield int(stdout.decode().rstrip())
async def f():
started_event = asyncio.Event()
# run Bash Job B
long_proc = await start_long_proc(started_event=started_event)
p_terminate = 10
monitor_interval = 1
async for n in periodic_monitor(): # periodically run Bash Job A
print(f'n={n}')
if n < p_terminate:
print('terminated and reschedule long proc')
long_proc.terminate() # kill Bash Job B
started_event.clear()
long_proc = await start_long_proc(5, started_event=started_event)
await asyncio.sleep(monitor_interval)
if __name__ == '__main__':
with event_loop() as loop:
asyncio.ensure_future(f())
loop.run_forever()
Solution
Is there any neater way I can accomplish the task?
long_proc.terminate() # kill Bash Job B
Well, sending SIGTERM is effective, but since you’re in control of what Job B runs you might prefer to have it do IPC so it gracefully shuts down after completing a processing phase, rather than being interrupted at some arbitrary point. Could be as simple as creating a sentinel /tmp file that B looks for.
A bigger issue is that you’re not doing wait4() child, like this:
long_proc.terminate()
long_proc.wait()
Instead you wrote a race with a 5-second delay to encourage the child to win. That long sleep implies wasted idle time. You could run the CPU flat out by synchronizing with exiting child.