Discussion:
[zeromq-dev] PUB socket dropping old messages instead of new ones
Auer, Jens
2016-12-05 12:47:59 UTC
Permalink
Hi,

I am using a XPUB socket to distribute data between three sites in several channels. For our application, it would be better if the socket would drop old messages when it reaches the HWM. I searched the mailing list and found some old questions asking about this, and an answer that it has been requested but never implemented, although it should be fairly easy. So I had a quick look at the source code, but I don't think that it is as easy as expected.

The internal pipe is implemented with the assumption that each end is not modified concurrently, i.e. there are never two concurrent writes are reads. Concurrent read and write is of course expected. From what I see, this makes it hard to drop the oldest message in the queue because the queue has to be modified from the writer thread. Does anybody have an idea on how to do this? I think it could be possible to send to a command to the writer thread to drop a message, but that does not sound very efficient. If somebody could point me to the right direction I can pick up from there.

Cheers,
Jens

--
Jens Auer | CGI | Software-Engineer
CGI (Germany) GmbH & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154
***@cgi.com<mailto:***@cgi.com>
Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter de.cgi.com/pflichtangaben<http://de.cgi.com/pflichtangaben>.

CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI Group Inc. and its affiliates may be contained in this message. If you are not a recipient indicated or intended in this message (or responsible for delivery of this message to such person), or you think for any reason that this message may have been addressed to you in error, you may not use or copy or deliver this message to anyone else. In such case, you should destroy this message and are asked to notify the sender by reply e-mail.
Luca Boccassi
2016-12-05 13:20:01 UTC
Permalink
Have you looked into implementing that in the radio/dish sockets?

Since it's still in DRAFT state, it will be easier to change.

Changing how the internal pipe works for the existing sockets without
breaking compatibility might be hard.
Post by Auer, Jens
Hi,
I am using a XPUB socket to distribute data between three sites in several channels. For our application, it would be better if the socket would drop old messages when it reaches the HWM. I searched the mailing list and found some old questions asking about this, and an answer that it has been requested but never implemented, although it should be fairly easy. So I had a quick look at the source code, but I don't think that it is as easy as expected.
The internal pipe is implemented with the assumption that each end is not modified concurrently, i.e. there are never two concurrent writes are reads. Concurrent read and write is of course expected. From what I see, this makes it hard to drop the oldest message in the queue because the queue has to be modified from the writer thread. Does anybody have an idea on how to do this? I think it could be possible to send to a command to the writer thread to drop a message, but that does not sound very efficient. If somebody could point me to the right direction I can pick up from there.
Cheers,
Jens
--
Jens Auer | CGI | Software-Engineer
CGI (Germany) GmbH & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154
Unsere Pflichtangaben gemÀß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter de.cgi.com/pflichtangaben<http://de.cgi.com/pflichtangaben>.
CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI Group Inc. and its affiliates may be contained in this message. If you are not a recipient indicated or intended in this message (or responsible for delivery of this message to such person), or you think for any reason that this message may have been addressed to you in error, you may not use or copy or deliver this message to anyone else. In such case, you should destroy this message and are asked to notify the sender by reply e-mail.
_______________________________________________
zeromq-dev mailing list
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Auer, Jens
2016-12-12 07:36:53 UTC
Permalink
Hi Luca,

I didn't look at the Radio/Dish sockets because we use 4.1 and there is not intention to switch to 4.2 soon. I hoped to do it in the xpub class or at least without changing the internal pipe to avoid breaking other things. I am not experienced with implementing lock-free data structures (but it does sound interesting...) and I could not find any reference for this.

Usually I would use a ringbuffer implement this behavior, but when it overwrites data there is a shared access to the memory which makes it problematic for a lock-free data structure. Ringbuffers are common to implement lock-free queues, but they never overwrite. I found one library which offers a lock-free ringbuffer with overwrite behavior (liblfds), so I will take a look. I suspect that taking a dependency just for this is not desired?

While looking at the source code, I saw that dbuffer used by ypipe_conflate uses a mutex. Shouldn't that be a performance issue? I think it could be implemented with atomic_ptr.

Cheers,
Jens
--
Dr. Jens Auer | CGI | Software Engineer
CGI Deutschland Ltd. & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154
***@cgi.com
Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter de.cgi.com/pflichtangaben.

CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI Group Inc. and its affiliates may be contained in this message. If you are not a recipient indicated or intended in this message (or responsible for delivery of this message to such person), or you think for any reason that this message may have been addressed to you in error, you may not use or copy or deliver this message to anyone else. In such case, you should destroy this message and are asked to notify the sender by reply e-mail.
-----Original Message-----
Luca Boccassi
Sent: 05 December 2016 14:20
To: ZeroMQ development list
Subject: Re: [zeromq-dev] PUB socket dropping old messages instead of new ones
Have you looked into implementing that in the radio/dish sockets?
Since it's still in DRAFT state, it will be easier to change.
Changing how the internal pipe works for the existing sockets without breaking
compatibility might be hard.
Post by Auer, Jens
Hi,
I am using a XPUB socket to distribute data between three sites in several
channels. For our application, it would be better if the socket would drop old
messages when it reaches the HWM. I searched the mailing list and found some old
questions asking about this, and an answer that it has been requested but never
implemented, although it should be fairly easy. So I had a quick look at the source
code, but I don't think that it is as easy as expected.
Post by Auer, Jens
The internal pipe is implemented with the assumption that each end is not
modified concurrently, i.e. there are never two concurrent writes are reads.
Concurrent read and write is of course expected. From what I see, this makes it
hard to drop the oldest message in the queue because the queue has to be modified
from the writer thread. Does anybody have an idea on how to do this? I think it
could be possible to send to a command to the writer thread to drop a message, but
that does not sound very efficient. If somebody could point me to the right direction
I can pick up from there.
Post by Auer, Jens
Cheers,
Jens
--
Jens Auer | CGI | Software-Engineer
CGI (Germany) GmbH & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154
Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter
de.cgi.com/pflichtangaben<http://de.cgi.com/pflichtangaben>.
Post by Auer, Jens
CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI
Group Inc. and its affiliates may be contained in this message. If you are not a
recipient indicated or intended in this message (or responsible for delivery of this
message to such person), or you think for any reason that this message may have
been addressed to you in error, you may not use or copy or deliver this message to
anyone else. In such case, you should destroy this message and are asked to notify
the sender by reply e-mail.
Post by Auer, Jens
_______________________________________________
zeromq-dev mailing list
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
Luca Boccassi
2016-12-13 17:50:01 UTC
Permalink
With regards to lock-free and wait-free concurrent data structures I'd
say the king is userspace rcu:

http://liburcu.org/
https://github.com/urcu/userspace-rcu
https://lwn.net/Articles/573424/

I've used it extensively at work and it is impressively powerful, when
used in the right context.

But as you noted I don't think we want hard dependencies. The only
mandatory dependency libzmq has at the moment is against the standard
library and nothing else.

If you want to implement something like this, I would suggest to do
something similar to what we have for the pollers or for security.
Multiple implementations with one chosen at build-time. The current
implementation will be the default, and if configured and if the library
is available an alternative one will be built instead.

This is the only way to be able to maintain a sane multiplatform
support.

Note that we only backport bug fixes to stable trees, so a change like
this most likely doesn't qualify :-)
Post by Auer, Jens
Hi Luca,
I didn't look at the Radio/Dish sockets because we use 4.1 and there is not intention to switch to 4.2 soon. I hoped to do it in the xpub class or at least without changing the internal pipe to avoid breaking other things. I am not experienced with implementing lock-free data structures (but it does sound interesting...) and I could not find any reference for this.
Usually I would use a ringbuffer implement this behavior, but when it overwrites data there is a shared access to the memory which makes it problematic for a lock-free data structure. Ringbuffers are common to implement lock-free queues, but they never overwrite. I found one library which offers a lock-free ringbuffer with overwrite behavior (liblfds), so I will take a look. I suspect that taking a dependency just for this is not desired?
While looking at the source code, I saw that dbuffer used by ypipe_conflate uses a mutex. Shouldn't that be a performance issue? I think it could be implemented with atomic_ptr.
Cheers,
Jens
--
Dr. Jens Auer | CGI | Software Engineer
CGI Deutschland Ltd. & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154
Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter de.cgi.com/pflichtangaben.
CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI Group Inc. and its affiliates may be contained in this message. If you are not a recipient indicated or intended in this message (or responsible for delivery of this message to such person), or you think for any reason that this message may have been addressed to you in error, you may not use or copy or deliver this message to anyone else. In such case, you should destroy this message and are asked to notify the sender by reply e-mail.
-----Original Message-----
Luca Boccassi
Sent: 05 December 2016 14:20
To: ZeroMQ development list
Subject: Re: [zeromq-dev] PUB socket dropping old messages instead of new ones
Have you looked into implementing that in the radio/dish sockets?
Since it's still in DRAFT state, it will be easier to change.
Changing how the internal pipe works for the existing sockets without breaking
compatibility might be hard.
Post by Auer, Jens
Hi,
I am using a XPUB socket to distribute data between three sites in several
channels. For our application, it would be better if the socket would drop old
messages when it reaches the HWM. I searched the mailing list and found some old
questions asking about this, and an answer that it has been requested but never
implemented, although it should be fairly easy. So I had a quick look at the source
code, but I don't think that it is as easy as expected.
Post by Auer, Jens
The internal pipe is implemented with the assumption that each end is not
modified concurrently, i.e. there are never two concurrent writes are reads.
Concurrent read and write is of course expected. From what I see, this makes it
hard to drop the oldest message in the queue because the queue has to be modified
from the writer thread. Does anybody have an idea on how to do this? I think it
could be possible to send to a command to the writer thread to drop a message, but
that does not sound very efficient. If somebody could point me to the right direction
I can pick up from there.
Post by Auer, Jens
Cheers,
Jens
--
Jens Auer | CGI | Software-Engineer
CGI (Germany) GmbH & Co. KG
Rheinstraße 95 | 64295 Darmstadt | Germany
T: +49 6151 36860 154
Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter
de.cgi.com/pflichtangaben<http://de.cgi.com/pflichtangaben>.
Post by Auer, Jens
CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to CGI
Group Inc. and its affiliates may be contained in this message. If you are not a
recipient indicated or intended in this message (or responsible for delivery of this
message to such person), or you think for any reason that this message may have
been addressed to you in error, you may not use or copy or deliver this message to
anyone else. In such case, you should destroy this message and are asked to notify
the sender by reply e-mail.
Post by Auer, Jens
_______________________________________________
zeromq-dev mailing list
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
https://lists.zeromq.org/mailman/listinfo/zeromq-dev
Loading...