# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import absolute_import
import base64
import json
import logging
import os
import requests # pylint: disable=import-error
from lib.results import result_types
HTML_SUMMARY_MAX = 4096
_HTML_SUMMARY_ARTIFACT = ''
_TEST_LOG_ARTIFACT = ''
# Maps result_types to the luci test-result.proto.
# https://godoc.org/go.chromium.org/luci/resultdb/proto/v1#TestStatus
RESULT_MAP = {
result_types.UNKNOWN: 'ABORT',
result_types.PASS: 'PASS',
result_types.FAIL: 'FAIL',
result_types.CRASH: 'CRASH',
result_types.TIMEOUT: 'ABORT',
result_types.SKIP: 'SKIP',
result_types.NOTRUN: 'SKIP',
}
def TryInitClient():
"""Tries to initialize a result_sink_client object.
Assumes that rdb stream is already running.
Returns:
A ResultSinkClient for the result_sink server else returns None.
"""
try:
with open(os.environ['LUCI_CONTEXT']) as f:
sink = json.load(f)['result_sink']
return ResultSinkClient(sink)
except KeyError:
return None
class ResultSinkClient(object):
"""A class to store the sink's post configurations and make post requests.
This assumes that the rdb stream has been called already and that the
server is listening.
"""
def __init__(self, context):
base_url = 'http://%s/prpc/luci.resultsink.v1.Sink' % context['address']
self.test_results_url = base_url + '/ReportTestResults'
self.report_artifacts_url = base_url + '/ReportInvocationLevelArtifacts'
self.update_invocation_url = base_url + '/UpdateInvocation'
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'ResultSink %s' % context['auth_token'],
}
self.session = requests.Session()
self.session.headers.update(headers)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def close(self):
"""Closes the session backing the sink."""
self.session.close()
def Post(self,
test_id,
status,
duration,
test_log,
test_file,
variant=None,
artifacts=None,
failure_reason=None,
html_artifact=None,
tags=None):
"""Uploads the test result to the ResultSink server.
This assumes that the rdb stream has been called already and that
server is ready listening.
Args:
test_id: A string representing the test's name.
status: A string representing if the test passed, failed, etc...
duration: An int representing time in ms.
test_log: A string representing the test's output.
test_file: A string representing the file location of the test.
variant: An optional dict of variant key value pairs as the
additional variant sent from test runners, which can override
or add to the variants passed to `rdb stream` command.
artifacts: An optional dict of artifacts to attach to the test.
failure_reason: An optional string with the reason why the test failed.
Should be None if the test did not fail.
html_artifact: An optional html-formatted string to prepend to the test's
log. Useful to encode click-able URL links in the test log, since that
won't be formatted in the test_log.
tags: An optional list of tuple of key name and value to prepend to the
test's tags.
Returns:
N/A
"""
assert status in RESULT_MAP
expected = status in (result_types.PASS, result_types.SKIP)
result_db_status = RESULT_MAP[status]
tr = {
'expected':
expected,
'status':
result_db_status,
'tags': [
{
'key': 'test_name',
'value': test_id,
},
{
# Status before getting mapped to result_db statuses.
'key': 'raw_status',
'value': status,
}
],
'testId':
test_id,
'testMetadata': {
'name': test_id,
}
}
if tags:
tr['tags'].extend({
'key': key_name,
'value': value
} for (key_name, value) in tags)
if variant:
tr['variant'] = {'def': variant}
artifacts = artifacts or {}
tr['summaryHtml'] = html_artifact if html_artifact else ''
# If over max supported length of html summary, replace with artifact
# upload.
if (test_log
and len(tr['summaryHtml']) + len(_TEST_LOG_ARTIFACT) > HTML_SUMMARY_MAX
or len(tr['summaryHtml']) > HTML_SUMMARY_MAX):
b64_summary = base64.b64encode(tr['summaryHtml'].encode()).decode()
artifacts.update({'HTML Summary': {'contents': b64_summary}})
tr['summaryHtml'] = _HTML_SUMMARY_ARTIFACT
if test_log:
# Upload the original log without any modifications.
b64_log = base64.b64encode(test_log.encode()).decode()
artifacts.update({'Test Log': {'contents': b64_log}})
tr['summaryHtml'] += _TEST_LOG_ARTIFACT
if artifacts:
tr['artifacts'] = artifacts
if failure_reason:
tr['failureReason'] = {
'primaryErrorMessage': _TruncateToUTF8Bytes(failure_reason, 1024)
}
if duration is not None:
# Duration must be formatted to avoid scientific notation in case
# number is too small or too large. Result_db takes seconds, not ms.
# Need to use float() otherwise it does substitution first then divides.
tr['duration'] = '%.9fs' % float(duration / 1000.0)
if test_file and str(test_file).startswith('//'):
tr['testMetadata']['location'] = {
'file_name': test_file,
'repo': 'https://chromium.googlesource.com/chromium/src',
}
res = self.session.post(url=self.test_results_url,
data=json.dumps({'testResults': [tr]}))
res.raise_for_status()
def ReportInvocationLevelArtifacts(self, artifacts):
"""Uploads invocation-level artifacts to the ResultSink server.
This is for artifacts that don't apply to a single test but to the test
invocation as a whole (eg: system logs).
Args:
artifacts: A dict of artifacts to attach to the invocation.
"""
req = {'artifacts': artifacts}
res = self.session.post(url=self.report_artifacts_url, data=json.dumps(req))
res.raise_for_status()
def UpdateInvocation(self, invocation, update_mask):
"""Update the invocation to the ResultSink server.
Details can be found in the proto luci.resultsink.v1.UpdateInvocationRequest
Args:
invocation: a dict representation of luci.resultsink.v1.Invocation proto
update_mask: a dict representation of google.protobuf.FieldMask proto
"""
req = {
'invocation': invocation,
'update_mask': update_mask,
}
res = self.session.post(url=self.update_invocation_url,
data=json.dumps(req))
res.raise_for_status()
def UpdateInvocationExtendedProperties(self, extended_properties, keys=None):
"""Update the extended_properties field of an invocation.
Details can be found in the "extended_properties" field of the proto
luci.resultdb.v1.Invocation.
Args:
extended_properties: a dict containing the content of extended_properties.
The value in the dict shall be a dict containing a "@type" key
representing the data schema, and corresponding data.
keys: (Optional) a list of keys in extended_properties to add, replace,
or remove. If a key exists in "keys", but not in "extended_properties",
this is considered as deleting the key from the resultdb record side
If None, the keys in "extended_properties" dict will be used.
"""
if not keys:
keys = extended_properties.keys()
mask_paths = ['extended_properties.%s' % key for key in keys]
invocation = {'extended_properties': extended_properties}
update_mask = {'paths': mask_paths}
self.UpdateInvocation(invocation, update_mask)
def _TruncateToUTF8Bytes(s, length):
""" Truncates a string to a given number of bytes when encoded as UTF-8.
Ensures the given string does not take more than length bytes when encoded
as UTF-8. Adds trailing ellipsis (...) if truncation occurred. A truncated
string may end up encoding to a length slightly shorter than length because
only whole Unicode codepoints are dropped.
Args:
s: The string to truncate.
length: the length (in bytes) to truncate to.
"""
try:
encoded = s.encode('utf-8')
# When encode throws UnicodeDecodeError in py2, it usually means the str is
# already encoded and has non-ascii chars. So skip re-encoding it.
except UnicodeDecodeError:
encoded = s
if len(encoded) > length:
# Truncate, leaving space for trailing ellipsis (...).
encoded = encoded[:length - 3]
# Truncating the string encoded as UTF-8 may have left the final codepoint
# only partially present. Pass 'ignore' to acknowledge and ensure this is
# dropped.
return encoded.decode('utf-8', 'ignore') + "..."
return s