in consumer git Linux producer protocol buffer rabbitmq ~ read.

Protocol Buffers and RabbitMQ on Ubuntu 14.04 Part 2

This article is the second post in series:

In the previous article, we have installed Protocol buffer, define .proto file and use protocol buffer compiler to compile .protocol file. In this article, we will continue step 4 and step 5, write producer and consumer in RabbitMQ to encode, decode and transfer protocol data. You can read about Step 1, step 2 and step 3 in the previous post.

To use Protocol Buffers in my systems, I will:

  • Step 1: Install Protocol buffer compiler on Ubuntu 14.04
  • Step 2: Define message structure formats in a .proto file.
  • Step 3: Use the protocol buffer compiler
  • Step 4: Write Producer use the Python protocol buffer API to encode and send messages to RabbitMQ server
  • Step 5: Write Consumer use the Python protocol buffer API to receive and decode messages from RabbitMQ server

Step 4: Write Producer use the Python protocol buffer API to encode and send messages to RabbitMQ server

  • At the first, we need to setup environment for this example. I will setup environment for Python 2.7 and use Pika library to work with RabbitMQ.

You will need to know about virtualenv and use pip. Please read about it in the post : Set Up Package Manager (PIP) And Virtual Environments in Python on Ubuntu 14.04.

virtualenv -p /usr/bin/python2.7 env  
source env/bin/activate  
pip install pika  
  • Next, we need to install protocol_buffer library for Python to work with protocol buffer

You can download protocol buffer code, Extract and copy python directory to your project

cd ..  
git clone https://github.com/google/protobuf.git  
cd protocol_buffers_python  
cp ../protobuf/python .  
cd python  
python setup.py install  
  • Continue, I will have a config.py file, which store RabbitMQ server information.
RABBIT_HOST= ‘localhost’  
QUEUE_TOPIC = ‘topic_name’  
  • Next, I will write *rabbit.py *to work with RabbitMQ.
import pika  
import config as cfg

class Rabbit():  
     def __init__(self):  
         self.conn = None  
         self.channel = None

     def connect(self):  
         self.conn = pika.BlockingConnection(pika.ConnectionParameters(host=cfg.RABBIT_HOST))  
         self.channel = self.conn.channel()  
    
     def close(self):  
         self.conn.close()  
    
     def send(self, topic, data):  
         # Open connection  
         self.connect()  
         # Declare queue to send data  
         self.channel.queue_declare(topic)  
         # Send data  
         self.channel.basic_publish(exchange=”, routing_key=topic, body=data)  
         print(" [x] Sent data to RabbitMQ")  
    
         # Close connection  
         self.close()  
    
     def receive(self, topic, callback):  
         # Open connection  
         self.connect()  
         # Declare queue to send data  
         self.channel.queue_declare(topic)  
         print(‘ [*] Waiting for messages. To exit press CTRL+C’)  
    
         # Listen and receive data from queue  
         self.channel.basic_consume(callback, queue=topic, no_ack=True)  
         self.channel.start_consuming()
  • After that, we will start to write producer.py*.*  
import users_pb2  
import sys  
import config as cfg  
import rabbit

r = rabbit.Rabbit()  
list_users = users_pb2.ListUsers()

def input(user):  
    user.id = int(raw_input("Enter User ID: "))  
    user.name = raw_input("Enter name: ")  
    email = raw_input("Enter email address (blank for none): ")  
    if email != ”:  
        user.email = email  
    
def send_rabbit(data):  
    # Connect to RabbitMQ and create channel  
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg.RABBIT_HOST))
    channel = connection.channel()  
    # Declare queue to send data  
    channel.queue_declare(queue=cfg.QUEUE_TOPIC)  
    # Send data  
    channel.basic_publish(exchange=”, routing_key=cfg.QUEUE_TOPIC, body=data)  
    print(" [x] Sent data to RabbitMQ")  
    connection.close()

try:  
    # Add list users  
    input(list_users.user.add())  
    # encode data  
    data_encode = list_users.SerializeToString()  
    # Send to rabbit  
    r.send(cfg.QUEUE_TOPIC, data_encode)  
    
except Exception as e:  
    print("Send data is error")  
    print(e)

Step 5: Write Consumer use the Python protocol buffer API to receive and decode messages from RabbitMQ server

And the finally step, we will write a consumer to receive data and decode it. The content of consumer.py:

import users_pb2  
import sys  
import config as cfg

import rabbit

r = rabbit.Rabbit()  
list_users = users_pb2.ListUsers()

def show(list_users):  
    for user in list_users.user:  
        print("User ID: {}".format(user.id))  
        print("User Name: {}".format(user.name))  
        if user.HasField(’email’):  
            print("Email: {}".format(user.email))

def callback_rabbit(ch, method, properties, body):  
    #print("Method: {}".format(method))  
    #print("Properties: {}".format(properties))

    print("n================================================n")  
    list_users.ParseFromString(body)  
    show(list_users)  

    print("n================================================n")

try:  
    # Receive Data from rabbit  
    r.receive(cfg.QUEUE_TOPIC, callback_rabbit)  
except Exception as e:  
    print("receive data is error")  
    print(e)

Test Producer and Consumer

  • The first, you run producer:
(env)[email protected]:~/working/source_code/protocol_buffer_python$ python producer.py

Enter User ID:  

And enter your value  

  • After that, you run consumer, if you see as below, it is work properly :
(env)[email protected]:~/working/source_code/protocol_buffer_python$ python consumer.py  

[*] Waiting for messages. To exit press CTRL+C  
User ID: 2  
User Name: cuong ba ngoc  
Email: [email protected]  

 
So, we have completed install RabbitMQ as queue, send and receive Protocol Buffer data with it. Thank you for reading.

You can see full source code on:

https://github.com/cuongbangoc/protocolbufferspython

comments powered by Disqus