Skip to content Skip to sidebar Skip to footer

Producing Content Indefinitely In A Separate Thread For All Connections?

I have a Twisted project which seeks to essentially rebroadcast collected data over TCP in JSON. I essentially have a USB library which I need to subscribe to and synchronously rea

Solution 1:

One thing you probably want to do is try to extend the common protocol/transport independence. Even though you need a thread with a long-running loop, you can hide this from the protocol. The benefit is the same as usual: the protocol becomes easier to test, and if you ever manage to have a non-threaded implementation of reading the USB events, you can just change the transport without changing the protocol.

from threading import Thread

classUSBThingy(Thread):
    def__init__(self, reactor, device, protocol):
        self._reactor = reactor
        self._device = device
        self._protocol = protocol

    defrun(self):
        whileTrue:
            for line in self._device.streamData():
                self._reactor.callFromThread(self._protocol.usbStreamLineReceived, line)

The use of callFromThread is part of what makes this solution usable. It makes sure the usbStreamLineReceived method gets called in the reactor thread rather than in the thread that's reading from the USB device. So from the perspective of that protocol object, there's nothing special going on with respect to threading: it just has its method called once in a while when there's some data to process.

Your protocol then just needs to implement usbStreamLineReceived somehow, and implement your other application-specific logic, like keeping a list of observers:

classSomeUSBProtocol(object):def__init__(self):
        self.observers = []

    defusbStreamLineReceived(self, line):
        data = MyBrandSpankingNewUSBDeviceData(line)
        # broadcast the datafor obs inself.observers[:]:
            obs(output)

And then observers can register themselves with an instance of this class and do whatever they want with the data:

classUSBObserverThing(Protocol):defconnectionMade(self):
        self.factory.usbProto.observers.append(self.emit)

    defconnectionLost(self):
        self.factory.usbProto.observers.remove(self.emit)

    defemit(self, output):
        # parse the data, convert to JSON
        output = convertDataToJSON(data)
        self.transport.write(output)

Hook it all together:

usbDevice = ...
usbProto = SomeUSBProtocol()
thingy = USBThingy(reactor, usbDevice, usbProto)
thingy.start()

factory = ServerFactory()
factory.protocol = USBObserverThing
factory.usbProto = usbProto
reactor.listenTCP(12345, factory)
reactor.run()

You can imagine a better observer register/unregister API (like one using actual methods instead of direct access to that list). You could also imagine giving the USBThingy a method for shutting down so SomeUSBProtocol could control when it stops running (so your process will actually be able to exit).

Post a Comment for "Producing Content Indefinitely In A Separate Thread For All Connections?"