import sys
import pika
import signal
import logging
import json
[docs]class Queue(object):
def __init__(self, config, qname):
self.qname = qname
creds = pika.PlainCredentials(config['user'], config['password'])
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host=config['host'], credentials=creds))
self.ch = self.conn.channel()
self.ch.queue_declare(queue=qname, durable=True)
self.ch.basic_qos(prefetch_count=1)
self.logger = logging.getLogger('sm-queue')
[docs] def start_consuming(self, callback):
self.ch.basic_consume(callback, queue=self.qname)
def stop_consuming(signum, frame):
self.ch.stop_consuming()
self.logger.info(' [v] Stopped consuming')
sys.exit()
signal.signal(signal.SIGINT, stop_consuming)
signal.signal(signal.SIGTERM, stop_consuming)
self.logger.info(' [*] Waiting for messages...')
self.ch.start_consuming()
[docs] def publish(self, msg):
self.ch.basic_publish(exchange='',
routing_key=self.qname,
body=json.dumps(msg),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
self.logger.info(" [v] Sent {} to {}".format(json.dumps(msg), self.qname))
[docs] def close(self):
self.conn.close()