Discussion:
[zeromq-dev] zmq REQ to multiple REP sockets
James Bardin
2011-02-08 19:39:10 UTC
Permalink
Hello,

I'm getting started with zmq, but I'm stumbling on the first problem
I wanted to solve.
I needed a RPC-like system that could connect to multiple servers, and
it the docs made it sound really easy. I would simply connect the
same REQ socket to each server, and everything would work magically.
Blocking, and simple round-robin are fine for what I need in this
situation, but It breaks when one party fails to cooperate.

I guess my question doesn't need multiple servers; since every send on
a REQ socket needs to complete a recv from a REP, what do I do when a
server hangs/disappears/is unavailable? I know how to handle this with
traditional sockets, but how do I get this to error or timeout with
zmq? Please point me to TFM if I missed something obvious.

(I'm best with python , so those are the bindings I'm using BTW, in
case there are any deficiencies in that library).

Thanks,
-jim
Pieter Hintjens
2011-02-08 19:49:48 UTC
Permalink
Hi James,

You would want to use XREQ and XREP sockets, probably, and manage
timeouts using poll. There is a fairly detailed example of an RPC
client and server in C, which can help give a design.

https://github.com/zeromq/zfl/blob/master/src/zfl_rpc.c (client)
https://github.com/zeromq/zfl/blob/master/src/zfl_rpcd.c (server)

This should be documented in Ch4 of the guide at some stage.

-
Pieter Hintjens
iMatix
Post by James Bardin
Hello,
I'm getting started with zmq, but I'm stumbling on  the first problem
I wanted to solve.
I needed a RPC-like system that could connect to multiple servers, and
it the docs made it sound really easy.  I would simply connect the
same REQ socket to each server, and everything would work magically.
Blocking, and simple round-robin are fine for what I need in this
situation, but It breaks when one party fails to cooperate.
I guess my question doesn't need multiple servers; since every send on
a REQ socket needs to complete a recv from a REP, what do I do when a
server hangs/disappears/is unavailable? I know how to handle this with
traditional sockets, but how do I get this to error or timeout with
zmq? Please point me to TFM if I missed something obvious.
(I'm best with python , so those are the bindings I'm using BTW, in
case there are any deficiencies in that library).
Thanks,
-jim
_______________________________________________
zeromq-dev mailing list
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
James Bardin
2011-02-08 21:19:08 UTC
Permalink
Post by Pieter Hintjens
Hi James,
You would want to use XREQ and XREP sockets, probably, and manage
timeouts using poll. There is a fairly detailed example of an RPC
client and server in C, which can help give a design.
https://github.com/zeromq/zfl/blob/master/src/zfl_rpc.c (client)
https://github.com/zeromq/zfl/blob/master/src/zfl_rpcd.c (server)
That would probably be the pinnacle of what I'm after, but it is
overkill for what I'm trying to solve right now. It would also take me
a while to go through the ZFL library to really comment on this (as
far as I see, there's no equivalent for the higher-language bindings).

May a simpler question to get me going;
Is there a fast way to detect a server that's not available? With a
socket, I would get a "connection refused", and move on from there.
With zmq, it lets me send(), and then hangs on recv(). I could poll
the zmq socket with a timeout, but it seems an unavailable server
should be able to fail fast.


-jim
Chuck Remes
2011-02-09 00:19:00 UTC
Permalink
Post by James Bardin
Post by Pieter Hintjens
Hi James,
You would want to use XREQ and XREP sockets, probably, and manage
timeouts using poll. There is a fairly detailed example of an RPC
client and server in C, which can help give a design.
https://github.com/zeromq/zfl/blob/master/src/zfl_rpc.c (client)
https://github.com/zeromq/zfl/blob/master/src/zfl_rpcd.c (server)
That would probably be the pinnacle of what I'm after, but it is
overkill for what I'm trying to solve right now.
Since the library does not support timeouts on REQ/REP sockets, then it isn't overkill. It's the only way available to solve your problem.
Post by James Bardin
It would also take me
a while to go through the ZFL library to really comment on this (as
far as I see, there's no equivalent for the higher-language bindings).
The python bindings should support zmq_poll(). As far as I know, those bindings support every facet of the 0mq library.

Rebuilding the zfl functionality would probably be pretty easy in python. I've seen past discussions about integrating 0mq sockets with twisted, so it may already be done for you.
Post by James Bardin
May a simpler question to get me going;
Is there a fast way to detect a server that's not available? With a
socket, I would get a "connection refused", and move on from there.
With zmq, it lets me send(), and then hangs on recv(). I could poll
the zmq socket with a timeout, but it seems an unavailable server
should be able to fail fast.
No matter how you look at it, at some point you will have to poll on the socket and implement your own timeout. The library does not have this built in yet.

In the case you mention above, zmq_connect() is an asynchronous operation. Unless it returns a non-0 value (which means it failed immediately), it will retry the connect in the background. Are you checking the return code from your calls to zmq_connect()?

I think you are confused about the fundamentals of 0mq. It's unfortunate that they are called "sockets" because people new to the library always try to use them *exactly* the same as they would a posix socket. 0mq sockets have different semantics and require a different way of approaching and solving problems.

cr
James Bardin
2011-02-09 03:53:13 UTC
Permalink
Post by Chuck Remes
Since the library does not support timeouts on REQ/REP sockets, then it isn't overkill. It's the only way available to solve your problem.
I'm starting to see that. I'm finding more examples in github, which
are helping to clarify things.
Post by Chuck Remes
The python bindings should support zmq_poll(). As far as I know, those bindings support every facet of the 0mq library.
Yes, it was just the amount of code, plus the zfl that confused me,
and being slow in c, it seemed a little daunting.
Post by Chuck Remes
No matter how you look at it, at some point you will have to poll on the socket and implement your own timeout. The library does not have this built in yet.
In the case you mention above, zmq_connect() is an asynchronous operation. Unless it returns a non-0 value (which means it failed immediately), it will retry the connect in the background. Are you checking the return code from your calls to zmq_connect()?
I think you are confused about the fundamentals of 0mq. It's unfortunate that they are called "sockets" because people new to the library always try to use them *exactly* the same as they would a posix socket. 0mq sockets have different semantics and require a different way of approaching and solving problems.
I was counting on needing zmq_poll, but you are right, and I'm still
coming to terms with the fundamentals here. I think stems from the
claims that zmq > "raw sockets", so it seems it should handle
everything I could do before, plus more.

A timeout of some sort would be great (by the sound of it, it's
planned?), at least in the higher level libraries where a lot pieces
are hidden. As a newcomer using python, what I ran into immediately
was a blocking call to recv, with no obvious way to program around it
when it blocks indefinitely, and as a defensive programmer, I always
count on that happening.

-jim
Ian Barber
2011-02-09 05:46:29 UTC
Permalink
Post by James Bardin
A timeout of some sort would be great (by the sound of it, it's
planned?), at least in the higher level libraries where a lot pieces
are hidden. As a newcomer using python, what I ran into immediately
was a blocking call to recv, with no obvious way to program around it
when it blocks indefinitely, and as a defensive programmer, I always
count on that happening.
-jim
You can pass a zmq.NOBLOCK to recv to have it return instantly if that's
what you're looking for - assuming that sufficient time has passed for a
message to be delivered to the socket you could equate a failure as a
failure of the node - using poll just lets you give it a specific timeout so
it will wait for so long before doing the same thing.

Ian
Ian Barber
2011-02-09 05:46:29 UTC
Permalink
Post by James Bardin
A timeout of some sort would be great (by the sound of it, it's
planned?), at least in the higher level libraries where a lot pieces
are hidden. As a newcomer using python, what I ran into immediately
was a blocking call to recv, with no obvious way to program around it
when it blocks indefinitely, and as a defensive programmer, I always
count on that happening.
-jim
You can pass a zmq.NOBLOCK to recv to have it return instantly if that's
what you're looking for - assuming that sufficient time has passed for a
message to be delivered to the socket you could equate a failure as a
failure of the node - using poll just lets you give it a specific timeout so
it will wait for so long before doing the same thing.

Ian
Martin Sustrik
2011-02-09 06:54:10 UTC
Permalink
Post by James Bardin
A timeout of some sort would be great (by the sound of it, it's
planned?)
Yes, timeout and auto-resend of request in case of node failure are on
the roadmap. However, nobody tried to implement that yet. Which is kind
of interesting given it's pretty low hanging fruit (no synchronisation
issues, no messy multi-component iteractions).

Martin
Pieter Hintjens
2011-02-09 08:09:52 UTC
Permalink
Post by Martin Sustrik
Yes, timeout and auto-resend of request in case of node failure are on
the roadmap. However, nobody tried to implement that yet. Which is kind
of interesting given it's pretty low hanging fruit (no synchronisation
issues, no messy multi-component iteractions).
IMO it's more complex, and there are more use cases, than you'd
imagine at first. The best strategy is therefore to document use
cases, implement them in user space (as we're doing with ZFL), remake
the best designs in other languages, and once they are well
established, try to move them (or parts of them, like heartbeating)
into the core.

-Pieter
Martin Sustrik
2011-02-09 06:54:10 UTC
Permalink
Post by James Bardin
A timeout of some sort would be great (by the sound of it, it's
planned?)
Yes, timeout and auto-resend of request in case of node failure are on
the roadmap. However, nobody tried to implement that yet. Which is kind
of interesting given it's pretty low hanging fruit (no synchronisation
issues, no messy multi-component iteractions).

Martin
Pieter Hintjens
2011-02-09 08:05:08 UTC
Permalink
Post by James Bardin
Yes, it was just the amount of code, plus the zfl that confused me,
and being slow in c, it seemed a little daunting.
I'll explain in more detail using some of the in progress docs which
describe two prototype designs (proto2, proto3). Warning, long email
coming up...

--- snippet 1 ---

Here is a basic reliable RPC design for proto2:

[diagram]

+-----------+ +-----------+ +-----------+
| | | | | |
| Client | | Client | | Client |
| | | | | |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-----------\
| XREP |
+-----------+
| |
| Queue |
| |
+-----------+
| XREP |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| XREQ | | XREQ | | XREQ |
+-----------+ +-----------+ +-----------+
| | | | | |
| Server | | Server | | Server |
| | | | | |
+-----------+ +-----------+ +-----------+


Figure # - Reliable Request-Reply
[/diagram]

## Client design

The client connects to the queue and sends requests in a synchronous
fashion. If it does not get a reply within a certain time (1 second),
it reports an error, and retries. It will retry three times, then
exit with a final message. It uses a REQ socket and zmq_poll.

## Server design

The server connects to the queue and uses the [least-recently used
routing][lru] design, i.e. connects an XREQ socket to the queue's XREP
socket and signals when ready for a new task. The server will
randomly simulate two problems when it receives a task:

1. A crash and restart while processing a request, i.e. close its
socket, block for 5 seconds, reopen its socket and restart.
2. A temporary busy wait, i.e. sleep 1 second then continue as normal.

[lru]: http://zguide.zeromq.org/chapter:all#toc46

When waiting for work, the server will send a heartbeat message (which
is an empty message) to the queue each second.

## Queue design

The queue binds to a frontend and backend socket and handles requests
and replies asynchronously on these using the LRU design. It manages
two lists of servers:

* Servers that are ready for work.
* Servers that are disabled.

It also manages a task queue.

The queue polls all sockets for input and then processes all incoming
messages. It queues tasks and distributes them to workers that are
alive. Any replies from workers are sent back to their original
clients, unless the worker is disabled, in which case the reply is
dropped.

Idle workers must signal that they are alive with a ready message or a
heartbeat, or they will be marked as disabled until they send a
message again. This is to detect blocked and disconnected workers
(since 0MQ does not report disconnections).

The queue detects a disabled worker in two ways: heartbeating, as
explained, and timeouts on request processing. If a reply does not
come back within (e.g.) 10ms, the queue marks the worker as disabled,
and retries with the next worker.

--- end snippet ---

This first design delegates a lot of the work to a queue device, which
makes client apps simpler. However it introduces an extra box on the
network, which we can eliminate if we are prepared to move more code
into the client edge.

--- snippet 2 ---

## Issues With this Design

* Why does the client RPC layer need a queue? It cannot, as designed,
have more than one request in flight at a time.

## Overview & Goals

This prototype continues the work done in proto2, turning it into a
reusable RPC layer built as two ZFL classes (zfl_rpcd, zfl_rpc). This
RPC layer exchanges zfl_msg structures (multipart messages) with no
attention to data encoding or decoding. It implements the retrying,
heartbeating, and failover mechanisms from proto2.

## RPC layer

Proto2 designs the RPC layer partially in the queue and partially in
the client, with complementary functionality in the server. In proto3
we move the queue's reliability logic into the client. This lets us
connect clients directly to servers:

[diagram]

+-----------+
| |
| Client |
| |
+-----------+
| RPC layer |
\-----+-----/
^
|
|
/---------------+---------------\
| | |
| | |
v v v
/-----------\ /-----------\ /-----------\
| | | | | |
| Server | | Server | | Server |
| | | | | |
+-----------+ +-----------+ +-----------+


Figure # - 1-hop RPC
[/diagram]

Or connect clients to queues (acting as servers), and queues (acting
as clients) to servers:

[diagram]

+-----------+
| |
| Client |
| |
+-----------+
| RPC layer |
\-----------/
^
|
|
/---------------+---------------\
| | |
| | |
v v v
/-----------\ /-----------\ /-----------\
| | | | | |
| Queue | | Queue | | Queue |
| | | | | |
+-----------+ +-----------+ +-----------+
| RPC layer | | RPC layer | | RPC layer |
\-----------/ \-----------/ \-----------/
: ^ :
: | :
|
/---------------+---------------\
| | |
| | |
v v v
/-----------\ /-----------\ /-----------\
| | | | | |
| Server | | Server | | Server |
| | | | | |
+-----------+ +-----------+ +-----------+


Figure # - N-hop RPC
[/diagram]

In fact there are two distinct RPC layers, one for clients and one for
servers. The RPC client layer would be an *internal queue device*.
That is, an application thread that uses the same model as the queue
from proto2. The RPC server layer would be an internal device that
implements the request-reply pattern and heartbeating.

## RPC Client Layer

### Overall Architecture

The RPC client layer handles a single client thread synchronously, so
does no queuing. It manages a set of servers using a least-recently
used model that combines failover with load-balancing. This assumes
that all servers are equivalent and stateless, i.e. pure "workers" in
the workload distribution sense.

The RPC client layer *could* be tuned or adapted to always use a
specific server first, allowing a primary/secondary backup scheme.
However proto3 does not do this.

We connect the single client application thread to the RPC client
thread using inproc:// socket, and we connect the RPC client thread to
the servers over TCP:

[diagram]

+-----------+
| |
| Client |
| |
+-----------+
| REQ |
\-----------/
^
| inproc://something
v
/-----------\
| REP |
+-----------+
| |
| LRU & RPC |
| |
+-----------+
| XREP |
\-----------/
^
| tcp://something
v


Figure # - RPC Client Layer
[/diagram]

Notes:

* The RPC client thread uses a REP socket to talk to client threads
since it talks to exactly one client in this prototype.

* The RPC client thread would connect outwards to servers, to keep the
'client server' semantics clear (clients connect to servers). This is
different from proto2, where servers connect to clients.

### Technical Implementation

As a technical implementation we'd use a zfl_rpc class that works as follows:

// Create thread and bind to inproc endpoint
zfl_rpc_t *rpc;
rpc = zfl_rpc_new (context);

// Connect to three servers
zfl_rpc_connect (rpc, "tpc://192.168.0.55:6061");
zfl_rpc_connect (rpc, "tpc://192.168.0.56:6061");
zfl_rpc_connect (rpc, "tpc://192.168.0.57:6061");

// Format request message as zfl_msg object
zfl_msg_t *request = zfl_msg_new ();
...

// Send message (destroy after sending) and return reply
zfl_msg_t *reply = zfl_rpc_send (rpc, &request);
if (!reply)
printf ("No service available\n");

// End RPC client thread
zfl_rpc_destroy (&rpc);

Notes:

* The server names would in practice be resolved via a name service (proto4).
* The client application talks to the RPC client thread using inproc
sockets whose names are generated automatically.

### Configuration

We will use the zfl_config layer, as used in proto1, for configuring
the RPC client layer:

// Load configuration from somewhere
zfl_config_t config = zfl_config_new (...);
// Configure RPC layer
zfl_rpc_configure (rpc, config);

## RPC Server Layer

### Overall Architecture

The RPC server layer would work much like the client layer except that
it implements the server side of proto2 (heartbeating and LRU ready
signaling):

[diagram]

^
| tcp://something
v
/-----------\
| XREP |
+-----------+
| |
| Queue |
| |
+-----------+
| REQ |
\-----------/
^
| inproc://something
v
/-----------\
| REP |
+-----------+
| |
| Server |
| |
+-----------+


Figure # - RPC Server Layer
[/diagram]

Note that clients and servers connect N-to-N, without (needing)
intermediate devices. The server queue stores requests from multiple
clients, if necessary, feeding them to the server application
one-by-one. Each client can have at most one queued request.

### Technical Implementation

Here is a mockup of the zfl_rpcd class interface:

// Create thread and bind to inproc endpoint
zfl_rpcd_t *rpcd;
rpcd = zfl_rpcd_new (context);

// Wait for request
zfl_msg_t *request = zfl_rpcd_recv (rpcd);

// Process request and prepare reply
...
zfl_msg_destroy (&request);

// Send reply (destroy after sending)
zfl_rpc_send (rpc, &reply);

// End RPC server thread
zfl_rpcd_destroy (&rpcd);

### Handshaking at Startup

We must use XREP-to-XREP sockets because we want to connect N clients
to N servers without (necessarily) an intermediary queue device.

In an XREP-to-XREP socket connection, one side of the connection must
know the identity of the other. You cannot do xrep-to-xrep flows
between two anonymous sockets since an XREP socket requires an
explicit identity. In practice this means we will need a name service
share the identities of the servers. The client will connect to the
server, then send it a message using the server's known identity as
address, and then the server can respond to the client.

In this prototype we'll use fixed, hardcoded identities for the
servers. We'll develop the name service in a later prototype.

## General Retry Mechanism

The retry mechanism is as follows:

* If there is a single server, the client will wait a certain period
(timeout) for the server to respond.
* If the server does not respond within the timeout, the client will
retry a number of times (retries).
* If there are multiple servers, the client will wait on each server
with the timeout, but will not retry the same server twice.

--- end snippet ---

And this is basically what turned into zfl_rpc and zfl_rpcd.

-Pieter

Loading...