from __future__ import unicode_literals
import re
from .metrics_core import Metric
from .samples import Sample
try:
import StringIO
except ImportError:
# Python 3
import io as StringIO
def text_string_to_metric_families(text):
"""Parse Prometheus text format from a unicode string.
See text_fd_to_metric_families.
"""
for metric_family in text_fd_to_metric_families(StringIO.StringIO(text)):
yield metric_family
ESCAPE_SEQUENCES = {
'\\\\': '\\',
'\\n': '\n',
'\\"': '"',
}
def replace_escape_sequence(match):
return ESCAPE_SEQUENCES[match.group(0)]
HELP_ESCAPING_RE = re.compile(r'\\[\\n]')
ESCAPING_RE = re.compile(r'\\[\\n"]')
def _replace_help_escaping(s):
return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)
def _replace_escaping(s):
return ESCAPING_RE.sub(replace_escape_sequence, s)
def _is_character_escaped(s, charpos):
num_bslashes = 0
while (charpos > num_bslashes and
s[charpos - 1 - num_bslashes] == '\\'):
num_bslashes += 1
return num_bslashes % 2 == 1
def _parse_labels(labels_string):
labels = {}
# Return if we don't have valid labels
if "=" not in labels_string:
return labels
escaping = False
if "\\" in labels_string:
escaping = True
# Copy original labels
sub_labels = labels_string
try:
# Process one label at a time
while sub_labels:
# The label name is before the equal
value_start = sub_labels.index("=")
label_name = sub_labels[:value_start]
sub_labels = sub_labels[value_start + 1:].lstrip()
# Find the first quote after the equal
quote_start = sub_labels.index('"') + 1
value_substr = sub_labels[quote_start:]
# Find the last unescaped quote
i = 0
while i < len(value_substr):
i = value_substr.index('"', i)
if not _is_character_escaped(value_substr, i):
break
i += 1
# The label value is between the first and last quote
quote_end = i + 1
label_value = sub_labels[quote_start:quote_end]
# Replace escaping if needed
if escaping:
label_value = _replace_escaping(label_value)
labels[label_name.strip()] = label_value
# Remove the processed label from the sub-slice for next iteration
sub_labels = sub_labels[quote_end + 1:]
next_comma = sub_labels.find(",") + 1
sub_labels = sub_labels[next_comma:].lstrip()
return labels
except ValueError:
raise ValueError("Invalid labels: %s" % labels_string)
# If we have multiple values only consider the first
def _parse_value_and_timestamp(s):
s = s.lstrip()
separator = " "
if separator not in s:
separator = "\t"
values = [value.strip() for value in s.split(separator) if value.strip()]
if not values:
return float(s), None
value = float(values[0])
timestamp = (float(values[-1])/1000) if len(values) > 1 else None
return value, timestamp
def _parse_sample(text):
# Detect the labels in the text
try:
label_start, label_end = text.index("{"), text.rindex("}")
# The name is before the labels
name = text[:label_start].strip()
# We ignore the starting curly brace
label = text[label_start + 1:label_end]
# The value is after the label end (ignoring curly brace and space)
value, timestamp = _parse_value_and_timestamp(text[label_end + 2:])
return Sample(name, _parse_labels(label), value, timestamp)
# We don't have labels
except ValueError:
# Detect what separator is used
separator = " "
if separator not in text:
separator = "\t"
name_end = text.index(separator)
name = text[:name_end]
# The value is after the name
value, timestamp = _parse_value_and_timestamp(text[name_end:])
return Sample(name, {}, value, timestamp)
def text_fd_to_metric_families(fd):
"""Parse Prometheus text format from a file descriptor.
This is a laxer parser than the main Go parser,
so successful parsing does not imply that the parsed
text meets the specification.
Yields Metric's.
"""
name = ''
documentation = ''
typ = 'untyped'
samples = []
allowed_names = []
def build_metric(name, documentation, typ, samples):
# Munge counters into OpenMetrics representation
# used internally.
if typ == 'counter':
if name.endswith('_total'):
name = name[:-6]
else:
new_samples = []
for s in samples:
new_samples.append(Sample(s[0] + '_total', *s[1:]))
samples = new_samples
metric = Metric(name, documentation, typ)
metric.samples = samples
return metric
for line in fd:
line = line.strip()
if line.startswith('#'):
parts = line.split(None, 3)
if len(parts) < 2:
continue
if parts[1] == 'HELP':
if parts[2] != name:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric
name = parts[2]
typ = 'untyped'
samples = []
allowed_names = [parts[2]]
if len(parts) == 4:
documentation = _replace_help_escaping(parts[3])
else:
documentation = ''
elif parts[1] == 'TYPE':
if parts[2] != name:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric
name = parts[2]
documentation = ''
samples = []
typ = parts[3]
allowed_names = {
'counter': [''],
'gauge': [''],
'summary': ['_count', '_sum', ''],
'histogram': ['_count', '_sum', '_bucket'],
}.get(typ, [''])
allowed_names = [name + n for n in allowed_names]
else:
# Ignore other comment tokens
pass
elif line == '':
# Ignore blank lines
pass
else:
sample = _parse_sample(line)
if sample.name not in allowed_names:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric, yield immediately as untyped singleton
name = ''
documentation = ''
typ = 'untyped'
samples = []
allowed_names = []
yield build_metric(sample[0], documentation, typ, [sample])
else:
samples.append(sample)
if name != '':
yield build_metric(name, documentation, typ, samples)