January 29, 2011

Recipe of request-response over AMQP with Python and Java

This is a recipe to setup synchronized messaging over AMQP. The recipe source zip file contains an implementation of a Python backend broker. Along comes example Python and Java clients that send messages over AMQP to the running broker(s). The broker load can be balanced over to several processes. RabbitMQ can automagically balance the incoming requests to multiple consumers. This setup has been deployed successfully, achieving decent performance (we have benchmarked only the private implementation, our throughput is approx. 30 msgs/sec. with three backend processes).

Good presentations on concurrent messaging and AMQP are available from:

AMQP can be used as a message bus between various system components in different languages maintained by different teams.

Here are the bare bones of this recipe:

  • create a unique identifier (qid) for the request
  • create a new consumer and bind it to a temporary response queue
  • publish the message into the durable request queue
  • wait for the response on the response queue
  • close response channel
Topic exchange is the only exchange type that honors routing keys, which are essential for pairing request and response together, using the unique identifier in routing key.

Unzip the recipe (or clone the recipe repository from GitHub). Install RabbitMQ (default settings are ok), and install Python modules 'amqplib' and 'carrot' (plus 'jsonrpclib' for the tests).

Launch the RabbitMQ server.

sudo rabbitmq-server

Start the example backend broker:

cd py-src
python example_broker.py
This will launch a daemon that listens for messages on a specific AMQP channel. A "request" is a thin JSON wrapper. If the request includes a request id (qid), the daemon will return the current time over the AMQP. If the request is not identified, the time will be printed instead.

Run the example requests in Python and Java, while the example_broker is running:

cd py-src
python example_request.py
Output:
Response from AMQP: 
{u'msg': u'Sat Jan 29 20:43:36 2011'}
Run the same requests from Java:
cd java-src
java -cp .:build:lib/json.jar:lib/rabbitmq-client-1.8.1.jar:lib/commons-logging.jar:lib/commons-io.jar \
  example.ExampleRequest
Output:
29.1.2011 21:11:01 recipe.amqpbus.AMQPRequestResponseImpl newConnection
INFO: ExampleBroker connected to RabbitMQ @ 127.0.0.1:5672/
29.1.2011 21:11:01 recipe.amqpbus.AMQPPublisher send
INFO: publishing query to ExampleBroker_req with binding key: example.request
{"q":{"q":"hello AMQP backend, I am Java"}}
-----------------------------------------
29.1.2011 21:11:01 recipe.amqpbus.AMQPPublisher send
INFO: publishing query to ExampleBroker_req with binding key: example.request.q538
{"q":{"q":"time can you get me please?"},"qid":"q538"}
29.1.2011 21:11:01 recipe.amqpbus.AMQPConsumer receive
INFO: amq.gen-dyK/PZQn/brNl9jTR/tlAg== received message: 
{"msg": "Sat Jan 29 21:11:01 2011"}
Response from AMQP: {"msg":"Sat Jan 29 21:11:01 2011"}

The example_broker output should print the events when it receives the requests: Output:

  consumer received message: 
  {'exchange': u'TestExchange', 
    'consumer_tag': u'__main__.ExampleBroker-e7175ec0-cd1f-4d2e-973b-1eeb42d4071d', 
    'routing_key': u'example.request.*', 
    'redelivered': False, 
    'delivery_tag': 2, 
    'channel': <amqplib.client_0_8.channel.Channel object at 0x1007bbad0>}
  -------------------
  received request 355:
  what time is it?
  -------------------
  response to TestExchange with routing_key: example.response.355, message: 
  {"msg": "Sat Jan 29 20:47:59 2011"}
  using channel_id: 3
  Channel open
  Closed channel #3

You should be able to pick up the ingredients and integrate them to your codebase as you see fit.