Is It Possible To Run Only A Single Step Of The Asyncio Event Loop
I'm working on a simple graphical network application, using asyncio and tkinter. I'm running into the problem of combining the asyncio event loop with Tk's mainloop. If possible,
Solution 1:
The missing of public method like loop.run_once()
is intentional.
Not every supported event loop has a method to iterate one step. Often underlying API has methods for creating event loop and running it forever but emulating single step may be very ineffective.
If you really need it you may implement single-step iteration easy:
import asyncio
def run_once(loop):
loop.call_soon(loop.stop)
loop.run_forever()
loop = asyncio.get_event_loop()
foriinrange(100):
print('Iteration', i)
run_once(loop)
Solution 2:
Take a look at this example.
import asyncio
from tkinter import *
classasyncTk(Tk):
def__init__(self):
super().__init__()
self.running = True
self.protocol("WM_DELETE_WINDOW", self.on_closing)
defon_closing(self):
self.running = False
self.destroy()
def__await__(self):
while self.running:
self.update()
yieldasyncdefasd():
for x inrange(1,10):
await asyncio.sleep(1)
print(x)
asyncdefmain():
w = asyncTk()
asyncio.create_task(asd())
await w
asyncio.run(main())
Solution 3:
I using the following procedure for own run_once()
and run_forever()
creation.
Here's a simplified example:
import asyncio
asyncdefworker(**kwargs):
id = kwargs.get('id', '0.0.0.0.0.0')
time = kwargs.get('time', 1)
try:
# Do stuff.print('start: ' + id)
finally:
await asyncio.sleep(time)
asyncdefworker_forever(**kwargs):
whileTrue:
await worker(**kwargs)
definit_loop(configs, forever=True):
loop = asyncio.get_event_loop()
if forever:
tasks = [
loop.create_task(worker_forever(id=conf['id'], time=conf['time']))
for conf in config
]
else:
tasks = [
asyncio.ensure_future(worker(id=conf['id'], time=conf['time']))
for conf in configs
]
return loop, tasks
defrun_once(configs):
print('RUN_ONCE')
loop, futures = init_loop(configs, forever=False)
result = loop.run_until_complete(asyncio.gather(*futures))
print(result)
defrun_forever(configs):
print('RUN_FOREVER')
loop, _ = init_loop(configs, forever=True)
try:
loop.run_forever()
except KeyboardInterrupt:
passfinally:
print("Closing Loop")
loop.close()
if __name__ == '__main__':
configurations = [
{'time': 5, 'id': '4'},
{'time': 6, 'id': '5'},
{'time': 1, 'id': '6'},
] # TODO :: DUMMY
run_once(configurations)
run_forever(configurations)
Post a Comment for "Is It Possible To Run Only A Single Step Of The Asyncio Event Loop"