上报逻辑
从rabbitmq-server的api接口读取相关数据,然后推送到falco-agent
汇报字段说明
key | tag | type | note |
rabbitmq.messages_ready | name(Queue名字) | GAUGE | 队列中处于等待被消费状态消息数 |
rabbitmq.messages_unacknowledged | name(Queue名字) | GAUGE | 队列中处于消费中状态的消息数 |
rabbitmq.messages_total | name(Queue名字) | GAUGE | 队列中所有未完成消费的消息数,等于messages_ready+messages_unacknowledged |
rabbitmq.ack_rate | name(Queue名字) | GAUGE | 消费者ack的速率 |
rabbitmq.deliver_rate | name(Queue名字) | GAUGE | deliver的速率 |
rabbitmq.deliver_get_rate | name(Queue名字) | GAUGE | deliver_get的速率 |
rabbitmq.publish_rate | name(Queue名字) | GAUGE | publish的速率 |
python脚本
#!/bin/env python
# -*- coding:utf-8 -*-
import json, time, socket
import requests
from requests.auth import HTTPBasicAuth
class Resource():
def __init__(self, mq_status_url):
self.host = socket.gethostname()
self.url = mq_status_url
self.step = 60
self.ip = socket.gethostname()
self.ts = int(time.time())
self.keys = ('messages_ready', 'messages_unacknowledged')
self.rates = ('ack', 'deliver', 'deliver_get', 'publish')
def run(self):
resp = requests.get(self.url, auth=HTTPBasicAuth('adminuser', '*********'))
data = json.loads(resp.text)
tag = ''
p = []
for queue in data:
# ready and unack
msg_total = 0
for key in self.keys:
q = {}
q["endpoint"] = self.ip
q['timestamp'] = self.ts
q['step'] = self.step
q['counterType'] = "GAUGE"
q['metric'] = 'rabbitmq.%s' % key
q['tags'] = 'name=%s,%s' % (queue['name'], tag)
q['value'] = int(queue[key])
msg_total += q['value']
p.append(q)
# total
q = {}
q["endpoint"] = self.ip
q['timestamp'] = self.ts
q['step'] = self.step
q['counterType'] = "GAUGE"
q['metric'] = 'rabbitmq.messages_total'
q['tags'] = 'name=%s,%s' % (queue['name'], tag)
q['value'] = msg_total
p.append(q)
# rates
for rate in self.rates:
q = {}
q["endpoint"] = self.ip
q['timestamp'] = self.ts
q['step'] = self.step
q['counterType'] = "GAUGE"
q['metric'] = 'rabbitmq.%s_rate' % rate
q['tags'] = 'name=%s,%s' % (queue['name'], tag)
try:
q['value'] = int(queue['message_stats']["%s_details" % rate]['rate'])
except:
q['value'] = 0
p.append(q)
return p
def pull_data(datapoints):
r = requests.post(falcon_addr, data=json.dumps(datapoints))
if __name__ == "__main__":
falcon_addr="http://127.0.0.1:1988/v1/push"
mq_status_url="http://127.0.0.1:15672/api/queues"
d = Resource(mq_status_url).run()
print(json.dumps(d,indent=4))
if d:
pull_data(d)
备注
- 使用时注意修改
mq_status_url
和HTTPBasicAuth用户名和密码