Discussion:
[zeromq-dev] Inproc endpoint and zmq_term.
Victor Vlasenko
2011-01-24 08:27:32 UTC
Permalink
Hi All,

Blocking behavior of zmq_term is very good for correctly flushing all
the messages on termination for over network connections. However it
seems not so good for inproc connections.

For inproc connections with blocking socket.recv() zmq_term might
generate TERM on our blocking recv earlier than messages will be flushed
to the socket. The issue is with order of operations, currently there is
no specific order of when messages will be flushed and when blocking
operation exit with TERM. If however zeromq first tried to flush all the
messages to all sockets and only after that generated TERM on all
blocking socket.recv() operations this way messages weren't loosed for
inprocess connections.

Given that inproc sockets have shared buffer, the correct behavior would
be to not generate TERM on recv() operations until this shared buffer is
empty.

The code that illustrates issue is provided below, problematic place is
marked with !!!...!!!

Victor

import org.zeromq.ZMQ;

public class Balancer {
private volatile int totalHandled = 0;

public class WorkerThread extends Thread {
private ZMQ.Context ctx;
private int handled = 0;
private int threadNo = 0;

public WorkerThread(int threadNo, ZMQ.Context ctx) {
super("Worker-" + threadNo);
this.threadNo = threadNo;
this.ctx = ctx;
}

public void run() {
try {
// Create PULL socket
ZMQ.Socket socket = ctx.socket(ZMQ.PULL);
// Set high water mark to 2,
// so that when this peer
// had 2 messages in its buffer,
// ZeroMQ skipped to next workers
socket.setHWM(2);
// Connect to in-process endpoint
socket.connect("inproc://workers");

while (true) {
byte[] msg;
try {
// Get work piece
msg = socket.recv(0);
} catch (Exception e) {
// ZeroMQ wrapper throws exception
// when context is terminated
// !!!We will loose messages, because we might
get TERM earlier than we get all the messages!!!
socket.close();
break;
}
handled++;
totalHandled++;
System.out.println(getName()
+ " handled work piece " + msg[0]);
int sleepTime = (threadNo % 2 == 0) ? 100 : 200;
// Handle work, by sleeping for some time
Thread.sleep(sleepTime);
}
System.out.println(getName()
+ " handled count " + handled);
} catch (Throwable t) {
t.printStackTrace();
}
}
}

public void run() {
try {
// Create ZeroMQ context
ZMQ.Context ctx = ZMQ.context(1);
// Create PUSH socket
ZMQ.Socket socket = ctx.socket(ZMQ.PUSH);
// Bind socket to in-process endpoint
socket.bind("inproc://workers");

// Create worker threads pool
Thread threads[] = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new WorkerThread(i, ctx);
threads[i].start();
}

// "Send" the work to workers
for (int i = 0; i < 100; i++) {
System.out.println("Sending work piece " + i);
byte[] msg = new byte[1];
msg[0] = (byte)i;
socket.send(msg, 0);
}
socket.close();

// Terminate ZeroMQ context
ctx.term();

System.out.println("Total handled " + totalHandled);
} catch (Throwable t) {
t.printStackTrace();
}
}

public static void main(String[] args) {
Balancer balancer = new Balancer();
balancer.run();
}
Victor Vlasenko
2011-01-24 13:20:47 UTC
Permalink
Hi All,

Blocking behavior of zmq_term is very good for correctly flushing all
the messages on termination for network connections. However it seems
that this doesn't work so good currently for inproc connections.

For inproc connections with blocking socket.recv() zmq_term might
generate TERM on our blocking recv earlier than messages will be flushed
to the socket. The issue is with order of operations, currently there is
no specific order of when messages will be flushed and when blocking
operation exit with TERM. If however zeromq first tried to flush all the
messages to all sockets and only after that generated TERM on all
blocking socket.recv() operations this way messages weren't loosed for
inprocess connections.

Given that inproc sockets have shared buffer, the correct behavior would
be to not generate TERM on recv() operations until this shared buffer is
empty.

The code that illustrates issue is provided here:
http://pastebin.com/9x0txGzS
Total message handled, that output by the code is not always 100, as
expected, but at times 99 or 98, e.g. some messages are lost.

OS: Linux x64 2.6.32
ZeroMQ version: 2.1.0

Victor
Victor Vlasenko
2011-01-24 13:26:24 UTC
Permalink
Hi All,

Blocking behavior of zmq_term is very good for correctly flushing all
the messages on termination for network connections. However it seems
that this doesn't work so good currently for inproc connections.

For inproc connections with blocking socket.recv() zmq_term might
generate TERM on our blocking recv earlier than messages will be flushed
to the socket. The issue is with order of operations, currently there is
no specific order of when messages will be flushed and when blocking
operation exit with TERM. If however zeromq first tried to flush all the
messages to all sockets and only after that generated TERM on all
blocking socket.recv() operations this way messages weren't loosed for
inprocess connections.

Given that inproc sockets have shared buffer, the correct behavior would
be to not generate TERM on recv() operations until this shared buffer is
empty.

The code that illustrates issue is provided here:
http://pastebin.com/9x0txGzS
Total message handled, that output by the code is not always 100, as
expected, but at times 99 or 98, e.g. some messages are lost.

OS: Linux x64 2.6.32
ZeroMQ version: 2.1.0

Victor

Loading...