pyf.station¶
PyF.Station is a protocol with client and server to transfer python generators accross tcp networks. Items in the generator must be pyf.transport.Packet instances.
Best practice is to provide information about the flow in the first packet, identified as an header (containing for example authentication data, method, target, and so on).
Errors are passed on both ends.
Examples of use¶
Server¶
Please note that the server requires tgscheduler (to spawn tasks, passing generators) and twisted.
An example:
from twisted.internet import reactor
from pyf.station import FlowServer
def sample_handler(flow, client=None):
header = flow.next()
print header
for i, item in enumerate(flow):
if not i%50:
print i, item
print "end of flow..."
factory = FlowServer(sample_handler)
reactor.listenTCP(8000,factory)
reactor.run()
Another example, if you are in an already threaded env (like a wsgi server):
from tgscheduler import scheduler
from twisted.internet import reactor
def sample_handler(flow, client=None):
header = flow.next()
print header
for i, item in enumerate(flow):
# every 50 items...
if not i%50:
print i, item
# we send a message to the client
client.message(Packet({'type': 'info',
'message': 'hello ! (%s)' % i}))
print "end of flow..."
factory = FlowServer(sample_handler)
reactor.listenTCP(8000,factory)
scheduler.add_single_task(reactor.run,
kw=dict(installSignalHandlers=0),
initialdelay=0)
Client¶
Example of client:
client = StationClient('127.0.0.1', 6789, True)
def message_handler(message_packet):
# the handler for messages that come back from the server
print message_packet
# we register our callback
client.add_listener('message_received', message_handler)
# we generate sample packets
flow = (Packet(dict(Field1=i+1,
Field2=('titi', 'tata')[i%2], num=i+1,
Field3=(i+1)*10))
for i in range(10000))
values = client.call(
flow,
header=dict(authtkt='my false auth token :)',
action='my_action'))
# here values is either "True" (saying that message has passed well) or a packet, comming back from the server.
for i, value in enumerate(values):
if not i % 5000:
print i
if isinstance(value, Packet):
print value
Module documentation¶
pyf.station.client¶
- class pyf.station.client.StationClient(host, port=6789, waits_for_success=False, separator='rnx00')¶
The station client. To send a flow to a client running on localhost, while receiving messages:
>>> client = StationClient('127.0.0.1', 6789, waits_for_success=True) >>> values = client.call(my_flow) >>> for value in values: ... print value # will print everything, while the processing on server isn't finished
To just send values without waiting for results:
>>> client = StationClient('127.0.0.1', 6789, waits_for_success=False) >>> values = client.call(my_flow) >>> for value in values: ... print value # will print every message until all the data has been sent, but not afterward.
- add_listener(name, callback, *args, **kwargs)¶
Adds an event listener on the instance.
Parameters: - name (unicode or str) – event name to listen for
- callback (callable) – the callable to fire when the event is emitted
Additionnal args and kwargs are passed to the callback when the event is fired
If you want to stop the callback chain, your callback should return False. All other return values are discarded.
- emit_event(name, *args, **kwargs)¶
Emit a named event. This will fire all the callbacks registered for the named event.
Parameters: - name (unicode or str) – event name to listen for
Additionnal args and kwargs are passed to the callbacks (before the one that were passed to add_listener)
- remove_listener(name, func)¶
Removes a callback from the callback list for the given event name.
Parameters: - name (unicode or str) – event name to listen for
- func (method) – the function of the callback to unregister
pyf.station.main¶
- class pyf.station.main.FlowServer(data_handler, max_items=1000, max_clients=100)¶
The flow server factory.
Example use:
>>> factory = FlowServer(my_handler) # handler is a function receiving a flow # and a "client" keyword argument # client being a FlowProtocol instance. >>> reactor.listenTCP(8000,factory) >>> reactor.run()