Protocol Buffers and RabbitMQ on Ubuntu 14.04 Part 2
This article is the second post in series:
- Introduce Protocol buffer and install protocol buffer compiler
- Wrire Producer and Consumer to work with protocol buffer and RabbitMQ
In the previous article, we have installed Protocol buffer, define .proto file and use protocol buffer compiler to compile .protocol file. In this article, we will continue step 4 and step 5, write producer and consumer in RabbitMQ to encode, decode and transfer protocol data. You can read about Step 1, step 2 and step 3 in the previous post.
To use Protocol Buffers in my systems, I will:
- Step 1: Install Protocol buffer compiler on Ubuntu 14.04
- Step 2: Define message structure formats in a
.proto
file. - Step 3: Use the protocol buffer compiler
- Step 4: Write Producer use the Python protocol buffer API to encode and send messages to RabbitMQ server
- Step 5: Write Consumer use the Python protocol buffer API to receive and decode messages from RabbitMQ server
Step 4: Write Producer use the Python protocol buffer API to encode and send messages to RabbitMQ server
- At the first, we need to setup environment for this example. I will setup environment for Python 2.7 and use Pika library to work with RabbitMQ.
You will need to know about virtualenv and use pip. Please read about it in the post : Set Up Package Manager (PIP) And Virtual Environments in Python on Ubuntu 14.04.
virtualenv -p /usr/bin/python2.7 env
source env/bin/activate
pip install pika
- Next, we need to install protocol_buffer library for Python to work with protocol buffer
You can download protocol buffer code, Extract and copy python directory to your project
cd ..
git clone https://github.com/google/protobuf.git
cd protocol_buffers_python
cp ../protobuf/python .
cd python
python setup.py install
- Continue, I will have a config.py file, which store RabbitMQ server information.
RABBIT_HOST= ‘localhost’
QUEUE_TOPIC = ‘topic_name’
- Next, I will write *rabbit.py *to work with RabbitMQ.
import pika
import config as cfg
class Rabbit():
def __init__(self):
self.conn = None
self.channel = None
def connect(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host=cfg.RABBIT_HOST))
self.channel = self.conn.channel()
def close(self):
self.conn.close()
def send(self, topic, data):
# Open connection
self.connect()
# Declare queue to send data
self.channel.queue_declare(topic)
# Send data
self.channel.basic_publish(exchange=”, routing_key=topic, body=data)
print(" [x] Sent data to RabbitMQ")
# Close connection
self.close()
def receive(self, topic, callback):
# Open connection
self.connect()
# Declare queue to send data
self.channel.queue_declare(topic)
print(‘ [*] Waiting for messages. To exit press CTRL+C’)
# Listen and receive data from queue
self.channel.basic_consume(callback, queue=topic, no_ack=True)
self.channel.start_consuming()
- After that, we will start to write producer.py*.*
import users_pb2
import sys
import config as cfg
import rabbit
r = rabbit.Rabbit()
list_users = users_pb2.ListUsers()
def input(user):
user.id = int(raw_input("Enter User ID: "))
user.name = raw_input("Enter name: ")
email = raw_input("Enter email address (blank for none): ")
if email != ”:
user.email = email
def send_rabbit(data):
# Connect to RabbitMQ and create channel
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg.RABBIT_HOST))
channel = connection.channel()
# Declare queue to send data
channel.queue_declare(queue=cfg.QUEUE_TOPIC)
# Send data
channel.basic_publish(exchange=”, routing_key=cfg.QUEUE_TOPIC, body=data)
print(" [x] Sent data to RabbitMQ")
connection.close()
try:
# Add list users
input(list_users.user.add())
# encode data
data_encode = list_users.SerializeToString()
# Send to rabbit
r.send(cfg.QUEUE_TOPIC, data_encode)
except Exception as e:
print("Send data is error")
print(e)
Step 5: Write Consumer use the Python protocol buffer API to receive and decode messages from RabbitMQ server
And the finally step, we will write a consumer to receive data and decode it. The content of consumer.py:
import users_pb2
import sys
import config as cfg
import rabbit
r = rabbit.Rabbit()
list_users = users_pb2.ListUsers()
def show(list_users):
for user in list_users.user:
print("User ID: {}".format(user.id))
print("User Name: {}".format(user.name))
if user.HasField(’email’):
print("Email: {}".format(user.email))
def callback_rabbit(ch, method, properties, body):
#print("Method: {}".format(method))
#print("Properties: {}".format(properties))
print("n================================================n")
list_users.ParseFromString(body)
show(list_users)
print("n================================================n")
try:
# Receive Data from rabbit
r.receive(cfg.QUEUE_TOPIC, callback_rabbit)
except Exception as e:
print("receive data is error")
print(e)
Test Producer and Consumer
- The first, you run producer:
(env)[email protected]:~/working/source_code/protocol_buffer_python$ python producer.py
Enter User ID:
And enter your value
- After that, you run consumer, if you see as below, it is work properly :
(env)[email protected]:~/working/source_code/protocol_buffer_python$ python consumer.py
[*] Waiting for messages. To exit press CTRL+C
User ID: 2
User Name: cuong ba ngoc
Email: [email protected]
So, we have completed install RabbitMQ as queue, send and receive Protocol Buffer data with it. Thank you for reading.
You can see full source code on: