Skip to content Skip to sidebar Skip to footer

How To Send Data Periodically With Asyncio.protocol Subclass

I have asyncio.Protocol subclass class MyProtocol(Protocol): def __init__(self, exit_future): self.exit_future = exit_future def connection_made(self, transport):

Solution 1:

how can I send some data on some external event occurs?

The easiest way is to spawn the coroutine in connection_made and leave it to handle the event in the "background":

defconnection_made(self, transport):
    self.transport = transport
    loop = asyncio.get_event_loop()
    self._interesting_events = asyncio.Queue()
    self.monitor = loop.create_task(self._monitor_impl())

defconnection_lost(self, exc):
    self.exit_future.set_result(True)
    self.monitor.cancel()

asyncdef_monitor_impl(self):
    whileTrue:
        # this can also await asyncio.sleep() or whatever is needed
        event = await self._interesting_events.get()
        self.transport.write(...)

Note that in the long run it might be worth it to replace create_connection with open_connection and use the streams API from the ground up. That way you can use coroutines all the way without worrying about the callback/coroutine impedance mismatch.

On an unrelated note, try followed by except: pass is an anti-pattern - consider catching a specific exception instead, or at least logging the exception.

Post a Comment for "How To Send Data Periodically With Asyncio.protocol Subclass"