producer.py 2.68 KB
Newer Older
1
#!/bin/bash /usr/scripts/python.sh
2
# -*- mode: python; coding: utf-8 -*-
3 4 5 6 7 8 9 10 11 12 13 14 15 16
#
# This code is a Crans production, based on pika documentation examples
# and on the RabbitMQ tutorials.
#
# This code is placed under the terms of GNU General Public License v3.
#
# Author   : Pierre-Elliott Bécue <becue@crans.org>
# Date     : 18/05/2014
# License  : GPLv3

import sys
import pika
import cranslib.clogger as clogger

17 18 19
DEBUG = False

logger = clogger.CLogger("cmbprod", sys.argv[0].split("/")[-1].replace(".py", ""), "info", DEBUG)
20 21 22

class BasicProducer(object):
    """
23
    This is CMB basic producer, it doesn't have to be asynchronous or anything,
24 25 26 27
    it's just a basic object that sends messages.

    """

28
    def __init__(self, url, exchange_name, app_id, port=5672, credentials=None, ssl=False):
29 30 31 32 33 34 35 36 37
        """Init

        """

        self._connection = None
        self._channel = None
        self._exchange_name = exchange_name
        self._app_id = app_id
        self._url = url
38 39 40
        self._port = port
        self._credentials = credentials
        self._ssl = ssl
41

42 43 44 45 46 47 48
        logger.info("Initializing with app_id %s" % (self._app_id,))

    def connect(self):
        """Opens a basic connection which handles almost anything we need.

        """
        logger.info("Connecting to %s…" % (self._url))
49
        return pika.BlockingConnection(pika.ConnectionParameters(host=self._url, port=self._port, credentials=self._credentials, ssl=self._ssl))
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85

    def get_chan(self):
        """Creates a channel and reopens connection if needed."""
        try:
            logger.info("Opening channel…")
            self._channel = self._connection.channel()
            logger.info("Channel active.")
        except AttributeError:
            logger.warning("Connection no longer working, reconnecting…")
            self._connection = self.connect()
            self.get_chan()

    def send_message(self, routing_key, body):
        """Sends basic message with app_id and body

        """
        try:
            logger.info("Sending message %s with routing_key %s." % (body, routing_key))
            self._channel.basic_publish(exchange=self._exchange_name,
                                        routing_key=routing_key,
                                        body=body,
                                        properties=pika.BasicProperties(
                                            delivery_mode=2,
                                            app_id=self._app_id,
                                        ))
        except:
            print "Failure on cmb.producer"
            raise

    def close(self):
        """Closes the connection

        """

        self._channel.close()
        self._connection.close()