Discussion:
[zeromq-dev] Pub/sub for a logger and multiprocessing
Frédéric
2016-07-22 15:56:18 UTC
Permalink
Hi!

I'm new to zeromq. I plan to use it for my multi-legs robot python
framework (Py4bot¹).

As a first exercise,

I wrote a custom logger which is a singleton, and shared accross the
application. As I'm switching from threads to processes, I'm trying to use
the zmq logger handler PUBHandler, to be able to remotly receive logs from
all processes, but it does not work out-of-the box in my case...

In the the logger __init__() method, I create a sub-process and
instanciate a zmq forwarder device, as explained here:

https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/devices/forwarder.html

so this device is only created once, at the first call to the logger.

Then, in my app, I create sub-processes using python
multiprocessing.Process class. but I get a zmq error:




¹ http://www.py4bot.org
--
Frédéric
Frédéric
2016-07-22 16:01:17 UTC
Permalink
(sorry for the truncated mail!)

Hi!

I'm new to zeromq. I plan to use it for my multi-legs robot python
framework (Py4bot¹).

As a first exercise,

I wrote a custom logger which is a singleton, and shared accross the
application. As I'm switching from threads to processes, I'm trying to use
the zmq logger handler PUBHandler, to be able to remotly receive logs from
all processes, but it does not work out-of-the box in my case...

In the the logger __init__() method, I create a sub-process and
instanciate a zmq forwarder device, as explained here:

https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/devices/forwarder.html

so this device is only created once, at the first call to the logger.

Then, in my app, I create sub-processes using python
multiprocessing.Process class. but I get a zmq error:

ZMQError: Address already in use

I don't understand what's going on. could somebody point me in a right
direction? Here is my sample code:

---------------------------------------

import time
import multiprocessing
import logging

import zmq
import zmq.utils.strtypes

FRONTEND_PORT = 5559
BACKEND_PORT = 5560


class Singleton(type):
def __init__(self, *args, **kwargs):
super(Singleton, self).__init__(*args, **kwargs)
self._instance = None

def __call__(self, *args, **kwargs):
if self._instance is None:
self._instance = super(Singleton, self).__call__(*args,
**kwargs)

return self._instance


class ZmqPubHandler(logging.Handler):
def __init__(self):
super(ZmqPubHandler, self).__init__()
context = zmq.Context.instance()
self._socket = context.socket(zmq.PUB)
self._socket.connect("tcp://localhost:%d" % FRONTEND_PORT)

def emit(self, record):
try:
bmsg = zmq.utils.strtypes.cast_bytes(self.format(record))
except Exception:
self.handleError(record)
return

self._socket.send_multipart(("logger", bmsg))


class Log(object):
__metaclass__ = Singleton

def __init__(self):
super(Log, self).__init__()

print "init Log"
self._logger = logging.getLogger("test")
handler = ZmqPubHandler()
self._logger.addHandler(handler)

multiprocessing.Process(target=self._device).start()

def _device(self):
context = zmq.Context.instance()

# Socket facing clients
frontend = context.socket(zmq.SUB)
frontend.bind("tcp://*:%d" % FRONTEND_PORT)

frontend.setsockopt(zmq.SUBSCRIBE, "")

# Socket facing services
backend = context.socket(zmq.PUB)
backend.bind("tcp://*:%d" % BACKEND_PORT)

zmq.device(zmq.FORWARDER, frontend, backend) # blocking call

def debug(self, *args, **kwargs):
self._logger.debug(*args, **kwargs)


class A:
def loop(self):
while True:
print "A", time.time()
Log().debug(time.time())
time.sleep(1)

class B:
def loop(self):
while True:
print "B", time.time()
Log().debug(time.time())
time.sleep(1)


if __name__ == "__main__":
print "launch A..."
multiprocessing.Process(target=A().loop).start()
print "launch B..."
multiprocessing.Process(target=B().loop).start()

while True:
time.sleep(0.001)

print "end"

------------------------------------------------------

I also found some special python objects, Device/Proxy/ProcessDevice...,
but I don't see how to use them.

Thanks for your help!

Best,

¹ http://www.py4bot.org
--
Frédéric
Frédéric
2016-07-23 16:40:03 UTC
Permalink
Ok, I finally got something working!

I attached my code (hope it is allowed by the list robot). Feel free to
comment...

But In this example, I had ti use this code for the forwarder:

----------------------------------------------

context = zmq.Context.instance()

# Socket facing clients
frontend = context.socket(zmq.SUB)
frontend.bind("tcp://*:%d" % FRONTEND_PORT)

frontend.setsockopt(zmq.SUBSCRIBE, "")

# Socket facing services
backend = context.socket(zmq.PUB)
backend.bind("tcp://*:%d" % BACKEND_PORT)

zmq.device(zmq.FORWARDER, frontend, backend)

----------------------------------------------

(launched as target multiprocessing.Process).

If I try to use a zmq ProcessDevice, which seems to be more elegant, I
get:

ZMQError: Invalid argument

on the bind_out().

Here is the code:

----------------------------------------------

pd = zmq.devices.ProcessDevice(zmq.FORWARDER, zmq.SUB, zmq.PUB)
pd.bind_in("tcp://*:%d" % FRONTEND_PORT)
pd.setsockopt_in(zmq.SUBSCRIBE, "")
pd.bind_out("tcp://*.%d" % BACKEND_PORT)
pd.start()

----------------------------------------------

Where is the mistake? Looking at the source, it look like the same as
above code...

Last, I would also like to know the difference between zmq.Context(), and
zmq.Context.instance()? When should I use one or the other?

Thanks,
--
Frédéric
Loading...