RabbitMQ 监控信息采集并上报falcon

rabbitmq  

上报逻辑

从rabbitmq-server的api接口读取相关数据,然后推送到falco-agent

汇报字段说明

key tag type note
rabbitmq.messages_ready name(Queue名字) GAUGE 队列中处于等待被消费状态消息数
rabbitmq.messages_unacknowledged name(Queue名字) GAUGE 队列中处于消费中状态的消息数
rabbitmq.messages_total name(Queue名字) GAUGE 队列中所有未完成消费的消息数,等于messages_ready+messages_unacknowledged
rabbitmq.ack_rate name(Queue名字) GAUGE 消费者ack的速率
rabbitmq.deliver_rate name(Queue名字) GAUGE deliver的速率
rabbitmq.deliver_get_rate name(Queue名字) GAUGE deliver_get的速率
rabbitmq.publish_rate name(Queue名字) GAUGE publish的速率

python脚本

#!/bin/env python
# -*- coding:utf-8 -*-


import  json, time, socket  
import requests  
from requests.auth import HTTPBasicAuth


class Resource():  
    def __init__(self, mq_status_url):
        self.host = socket.gethostname()
        self.url = mq_status_url

        self.step = 60
        self.ip = socket.gethostname()
        self.ts = int(time.time())

        self.keys = ('messages_ready', 'messages_unacknowledged')
        self.rates = ('ack', 'deliver', 'deliver_get', 'publish')


    def run(self):
        resp = requests.get(self.url, auth=HTTPBasicAuth('adminuser', '*********'))

        data = json.loads(resp.text)
        tag = ''

        p = []
        for queue in data:
            # ready and unack
            msg_total = 0
            for key in self.keys:
                q = {}
                q["endpoint"] = self.ip
                q['timestamp'] = self.ts
                q['step'] = self.step
                q['counterType'] = "GAUGE"
                q['metric'] = 'rabbitmq.%s' % key
                q['tags'] = 'name=%s,%s' % (queue['name'], tag)
                q['value'] = int(queue[key])
                msg_total += q['value']
                p.append(q)

            # total
            q = {}
            q["endpoint"] = self.ip
            q['timestamp'] = self.ts
            q['step'] = self.step
            q['counterType'] = "GAUGE"
            q['metric'] = 'rabbitmq.messages_total'
            q['tags'] = 'name=%s,%s' % (queue['name'], tag)
            q['value'] = msg_total
            p.append(q)

            # rates
            for rate in self.rates:
                q = {}
                q["endpoint"] = self.ip
                q['timestamp'] = self.ts
                q['step'] = self.step
                q['counterType'] = "GAUGE"
                q['metric'] = 'rabbitmq.%s_rate' % rate
                q['tags'] = 'name=%s,%s' % (queue['name'], tag)
                try:
                    q['value'] = int(queue['message_stats']["%s_details" % rate]['rate'])
                except:
                    q['value'] = 0
                p.append(q)

        return p

def pull_data(datapoints):

    r = requests.post(falcon_addr, data=json.dumps(datapoints))

if __name__ == "__main__":  
    falcon_addr="http://127.0.0.1:1988/v1/push"
    mq_status_url="http://127.0.0.1:15672/api/queues"

    d = Resource(mq_status_url).run()
    print(json.dumps(d,indent=4))
    if d:
        pull_data(d)

备注

  • 使用时注意修改mq_status_urlHTTPBasicAuth用户名和密码