Description
I'm using a python plugin for collectd-5.5.0. I have compiled with CFLAGS="-g -O0" and captured the full stack backtrace.
This should be easily reproducible, since I haven't patched my source at all, and the crash happens when the service I am monitoring it down (so you don't need rabbitmq-server). I'll give my build instructions and the python plugin script and configuration last.
/opt/collectd/sbin/collectd -C /etc/collectd.conf -f # Crashes after 21:10 minutes
I am using a rabbitmq.py plugin script (full script below). When my rabbitmq-server is not running, the rabbitmq.py plugin script encounters an "Unhandled python exception in read callback", which is probably sloppy coding, but collectd recovers this gracefully by suspending it for 10, 20, 40, ... seconds (doubling each time the read-function fails). So far, this is normal.
However, after a number of these failures, there is a segmentation fault. This is what the /var/log/messages reports at that point:
Oct 1 15:22:54 vds collectd[31856]: Error: <urlopen error [Errno 111] Connection refused>
Oct 1 15:22:54 vds collectd[31856]: Unhandled python exception in read callback: TypeError: 'NoneType' object is not iterable
Oct 1 15:22:54 vds collectd[31856]: Traceback (most recent call last):
Oct 1 15:22:54 vds collectd[31856]: File "/opt/collectd/lib/collectd/python/rabbitmq.py", line 222, in read#012 for node in get_info("%s/nodes" % (base_url)):
Oct 1 15:22:54 vds collectd[31856]: TypeError: 'NoneType' object is not iterable
Oct 1 15:22:54 vds kernel: collectd[31865]: segfault at 0 ip 00007f06d47fb1f4 sp 00007f06c6041d60 error 6 in libpython2.6.so.1.0[7f06d4761000+15d000]
If the segfault hadn't happened, that last message would have been the standard doubling to suspend the plugin. It would have been something like this (but wasn't):
Oct 1 15:22:54 vds collectd[31865]: read-function of plugin `python.rabbitmq' failed. Will suspend it for 1280.000 seconds.
Since I compiled collectd from source with CFLAGS="-g -O0", I was able to capture the full stack backtrace:
(gdb) bt full
#0 0x00007f06d47fb1f4 in ?? () from /usr/lib64/libpython2.6.so.1.0
No symbol table info available.
#1 0x00007f06d4b0b947 in cpy_log_exception (context=0x7f06d4b130f8 "read callback") at python.c:325
l = 3
i = 3
typename = 0xd03fa4 "TypeError"
message = 0xd08914 "'NoneType' object is not iterable"
type = 0x7f06d4ac6e00
value = 0xd14a28
traceback = 0xd14950
tn = 0xd03f80
m = 0xd088f0
list = 0xd14908
#2 0x00007f06d4b0ba1d in cpy_read_callback (data=0xcca6a8) at python.c:337
gil_state = PyGILState_UNLOCKED
c = 0xc51e00
ret = 0x0
#3 0x000000000040ddec in plugin_read_thread (args=0x0) at plugin.c:575
callback = 0x7f06d4b0b9c2 <cpy_read_callback>
rf = 0xcca6a0
old_ctx = {interval = 0}
status = 0
rf_type = 1
now = 1550190464909938976
rc = 110
__PRETTY_FUNCTION__ = "plugin_read_thread"
#4 0x00007f06d76f6a51 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#5 0x00007f06d723f9ad in clone () from /lib64/libc.so.6
No symbol table info available.
Note that the value of type = 0x7f06d4ac6e00, which is a wacky C pointer. This is the python.c:325 line of code:
Py_DECREF(type);
The value of type was assigned in one of these lines of code at the top of the cpy_log_exception() function:
PyErr_Fetch(&type, &value, &traceback);
PyErr_NormalizeException(&type, &value, &traceback);
Here is how I built my collectd-5.5.0 from source code (https://collectd.org/files/collectd-5.5.0.tar.gz):
./configure --prefix=/opt/collectd-5.5.0 --localstatedir=/var \
--enable-python --enable-perl --enable-libcurl --enable-nginx CFLAGS="-g -O0"
make
make install
ln -s collectd-5.5.0 /opt/collectd
You probably need my python plugin to easily reproduce. I doubt the problem is related to the rest of my /etc/collectd.conf, but this is my /etc/collectd.d/python:
<LoadPlugin "python">
Globals true
</LoadPlugin>
<Plugin "python">
ModulePath "/opt/collectd/lib/collectd/python"
Import rabbitmq
<Module rabbitmq>
Username "guest"
Password "guest"
Realm "RabbitMQ Management"
Host "localhost"
Port "15672"
<Ignore "queue">
Regex "amq-gen-.*"
Regex "tmp-.*"
</Ignore>
</Module>
</Plugin>
And this is the python plugin script (/opt/collectd/lib/collectd/python/rabbitmq.py). As I said, I don't think you need rabbitmq-server installed at all, since the problem happens when that is unreachable. Sorry this is so lengthy:
"""
python plugin for collectd to obtain rabbitmq stats
Originally from https://github.com/NYTimes/collectd-rabbitmq, modified
the metric names to fit naturally in the DonorChoose.org hierarchy:
* Since we don't use vhost, we use Hostname from /etc/collectd.conf
* Use "rabbitmq-" prefix for "queues" and "exchanges" plugin names,
to group these under the "rabbitmq" plugin.
* Shorten the "rabbitmq_details" metric_type to "details", since the
prefix was redundant (it appears in the plugin name one level above).
Example of metric names (NY = NY Times original, DC = our name):
* NY (rabbit = 2nd part of rabbit@rabbit, rabbitmq_default = / vhost):
test/rabbit/rabbitmq/disk_free # rabbit = 2nd component of rabbit@rabbit
test/rabbitmq_default/queues-email_low/messages
test/rabbitmq_default/queues-email_low/rabbitmq_details-messages/rate
* DC (rabbit = Hostname from /etc/collectd.conf):
test/rabbit/rabbitmq/disk_free
test/rabbit/rabbitmq-queues-email_low/messages
test/rabbit/rabbitmq-queues-email_low/details-messages/rate
"""
import collectd
import urllib2
import urllib
import json
import re
RABBIT_API_URL = "http://{host}:{port}/api/"
QUEUE_MESSAGE_STATS = ['messages', 'messages_ready', 'messages_unacknowledged']
QUEUE_STATS = ['memory', 'consumers']
MESSAGE_STATS = ['ack', 'publish', 'publish_in', 'publish_out', 'confirm',
'deliver', 'deliver_noack', 'get', 'get_noack', 'deliver_get',
'redeliver', 'return']
MESSAGE_DETAIL = ['avg', 'avg_rate', 'rate', 'sample']
NODE_STATS = ['disk_free', 'disk_free_limit', 'fd_total',
'fd_used', 'mem_limit', 'mem_used',
'proc_total', 'proc_used', 'processors', 'run_queue',
'sockets_total', 'sockets_used']
PLUGIN_CONFIG = {
'username': 'guest',
'password': 'guest',
'host': 'localhost',
'port': 15672,
'realm': 'RabbitMQ Management'
}
def configure(config_values):
'''
Load information from configuration file
'''
global PLUGIN_CONFIG
collectd.info('Configuring RabbitMQ Plugin')
for config_value in config_values.children:
collectd.info("%s = %s" % (config_value.key,
len(config_value.values) > 0))
if len(config_value.values) > 0:
if config_value.key == 'Username':
PLUGIN_CONFIG['username'] = config_value.values[0]
elif config_value.key == 'Password':
PLUGIN_CONFIG['password'] = config_value.values[0]
elif config_value.key == 'Host':
PLUGIN_CONFIG['host'] = config_value.values[0]
elif config_value.key == 'Port':
PLUGIN_CONFIG['port'] = config_value.values[0]
elif config_value.key == 'Realm':
PLUGIN_CONFIG['realm'] = config_value.values[0]
elif config_value.key == 'Ignore':
type_rmq = config_value.values[0]
PLUGIN_CONFIG['ignore'] = {type_rmq: []}
for regex in config_value.children:
PLUGIN_CONFIG['ignore'][type_rmq].append(
re.compile(regex.values[0]))
def init():
'''
Initalize connection to rabbitmq
'''
collectd.info('Initalizing RabbitMQ Plugin')
def get_info(url):
'''
return json object from url
'''
try:
info = urllib2.urlopen(url)
except urllib2.HTTPError as http_error:
collectd.error("Error: %s" % (http_error))
return None
except urllib2.URLError as url_error:
collectd.error("Error: %s" % (url_error))
return None
return json.load(info)
def dispatch_values(values, host, plugin, plugin_instance, metric_type,
type_instance=None):
'''
dispatch metrics to collectd
Args:
values (tuple): the values to dispatch
host: (str): the name of the vhost
plugin (str): the name of the plugin. Should be queue/exchange
plugin_instance (str): the queue/exchange name
metric_type: (str): the name of metric
type_instance: Optional
'''
collectd.debug("Dispatching %s %s %s %s %s\n\t%s " % (host, plugin,
plugin_instance, metric_type, type_instance, values))
metric = collectd.Values()
if host:
metric.host = host
metric.plugin = plugin
if plugin_instance:
metric.plugin_instance = plugin_instance
metric.type = metric_type
if type_instance:
metric.type_instance = type_instance
metric.values = values
metric.dispatch()
def dispatch_message_stats(data, vhost, plugin, plugin_instance):
"""
Sends message stats to collectd.
"""
if not data:
collectd.debug("No data for %s in vhost %s" % (plugin, vhost))
return
for name in MESSAGE_STATS:
dispatch_values((data.get(name, 0),), vhost, plugin,
plugin_instance, name)
def dispatch_queue_metrics(queue, vhost):
'''
Dispatches queue metrics for queue in vhost
'''
for name in QUEUE_STATS:
values = list((queue.get(name, 0),))
dispatch_values(values, None, 'rabbitmq-queues', queue['name'],
name)
for name in QUEUE_MESSAGE_STATS:
values = list((queue.get(name, 0),))
dispatch_values(values, None, 'rabbitmq-queues', queue['name'],
name)
details = queue.get("%s_details" % name, None)
if not details:
continue
values = list()
for detail in MESSAGE_DETAIL:
values.append(details.get(detail, 0))
dispatch_values(values, None, 'rabbitmq-queues', queue['name'],
'details', name)
dispatch_message_stats(queue.get('message_stats', None), None,
'rabbitmq-queues', queue['name'])
def dispatch_exchange_metrics(exchange, vhost):
'''
Dispatches exchange metrics for exchange in vhost
'''
dispatch_message_stats(exchange.get('message_stats', None), None,
'rabbitmq-exchanges', exchange['name'])
def dispatch_node_metrics(node):
'''
Dispatches node metrics
'''
for name in NODE_STATS:
dispatch_values((node.get(name, 0),), None,
'rabbitmq', None, name)
def want_to_ignore(type_rmq, name):
"""
Applies ignore regex to the queue.
"""
if 'ignore' in PLUGIN_CONFIG:
if type_rmq in PLUGIN_CONFIG['ignore']:
for regex in PLUGIN_CONFIG['ignore'][type_rmq]:
match = regex.match(name)
if match:
return True
return False
def read(input_data=None):
'''
reads all metrics from rabbitmq
'''
collectd.debug("Reading data with input = %s" % (input_data))
base_url = RABBIT_API_URL.format(host=PLUGIN_CONFIG['host'],
port=PLUGIN_CONFIG['port'])
auth_handler = urllib2.HTTPBasicAuthHandler()
auth_handler.add_password(realm=PLUGIN_CONFIG['realm'],
uri=base_url,
user=PLUGIN_CONFIG['username'],
passwd=PLUGIN_CONFIG['password'])
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
#First get all the nodes
for node in get_info("%s/nodes" % (base_url)):
dispatch_node_metrics(node)
#Then get all vhost
for vhost in get_info("%s/vhosts" % (base_url)):
vhost_name = urllib.quote(vhost['name'], '')
collectd.debug("Found vhost %s" % vhost['name'])
for queue in get_info("%s/queues/%s" % (base_url, vhost_name)):
queue_name = urllib.quote(queue['name'], '')
collectd.debug("Found queue %s" % queue['name'])
if not want_to_ignore("queue", queue_name):
queue_data = get_info("%s/queues/%s/%s" % (base_url,
vhost_name,
queue_name))
if queue_data is not None:
dispatch_queue_metrics(queue_data, vhost)
else:
collectd.warning("Cannot get data back from %s/%s queue" %
(vhost_name, queue_name))
for exchange in get_info("%s/exchanges/%s" % (base_url,
vhost_name)):
exchange_name = urllib.quote(exchange['name'], '')
if exchange_name:
collectd.debug("Found exchange %s" % exchange['name'])
exchange_data = get_info("%s/exchanges/%s/%s" % (
base_url, vhost_name, exchange_name))
dispatch_exchange_metrics(exchange_data, vhost)
def shutdown():
'''
Shutdown connection to rabbitmq
'''
collectd.info('RabbitMQ plugin shutting down')
# Register callbacks
collectd.register_config(configure)
collectd.register_init(init)
collectd.register_read(read)
#collectd.register_write(write)
collectd.register_shutdown(shutdown)
A few custom types needed for that rabbitmq.py plugin. Add these lines to the end of the /opt/collectd/share/collectd/types.db file:
########################################################################
# Custom types for the collectd-rabbitmq python plugin.
# Originally from https://github.com/NYTimes/collectd-rabbitmq,
# modified to shorten the "rabbitmq_details" metric_type to "details"
disk_free value:GAUGE:0:U
disk_free_limit value:GAUGE:0:U
fd_total value:GAUGE:0:U
fd_used value:GAUGE:0:U
mem_limit value:GAUGE:0:U
mem_used value:GAUGE:0:U
proc_total value:GAUGE:0:U
proc_used value:GAUGE:0:U
processors value:GAUGE:0:U
run_queue value:GAUGE:0:U
sockets_total value:GAUGE:0:U
sockets_used value:GAUGE:0:U
messages value:GAUGE:0:U
messages_ready value:GAUGE:0:U
messages_unacknowledged value:GAUGE:0:U
consumers value:GAUGE:0:U
details avg:GAUGE:0:U, avg_rate:GAUGE:0:U, rate:GAUGE:0:U, samples:GAUGE:0:U
ack value:GAUGE:0:U
publish value:GAUGE:0:U
publish_in value:GAUGE:0:U
publish_out value:GAUGE:0:U
confirm value:GAUGE:0:U
deliver value:GAUGE:0:U
deliver_noack value:GAUGE:0:U
get value:GAUGE:0:U
get_noack value:GAUGE:0:U
deliver_get value:GAUGE:0:U
redeliver value:GAUGE:0:U
return value:GAUGE:0:U