#!/usr/bin/env python # # # Run librdkafka regression tests on with different SASL parameters # and broker verisons. # # Requires: # trivup python module # gradle in your PATH from cluster_testing import LibrdkafkaTestCluster, print_report_summary, print_test_report_summary from LibrdkafkaTestApp import LibrdkafkaTestApp import subprocess import time import tempfile import os import sys import argparse import json import tempfile def test_it (version, deploy=True, conf={}, rdkconf={}, tests=None, debug=False): """ @brief Create, deploy and start a Kafka cluster using Kafka \p version Then run librdkafka's regression tests. """ cluster = LibrdkafkaTestCluster(version, conf, debug=debug) # librdkafka's regression tests, as an App. rdkafka = LibrdkafkaTestApp(cluster, version, _rdkconf, tests=tests) rdkafka.do_cleanup = False rdkafka.local_tests = False if deploy: cluster.deploy() cluster.start(timeout=30) print('# Connect to cluster with bootstrap.servers %s' % cluster.bootstrap_servers()) rdkafka.start() print('# librdkafka regression tests started, logs in %s' % rdkafka.root_path()) try: rdkafka.wait_stopped(timeout=60*10) rdkafka.dbg('wait stopped: %s, runtime %ds' % (rdkafka.state, rdkafka.runtime())) except KeyboardInterrupt: print('# Aborted by user') report = rdkafka.report() report['root_path'] = rdkafka.root_path() cluster.stop(force=True) cluster.cleanup() return report def handle_report (report, version, suite): """ Parse test report and return tuple (Passed(bool), Reason(str)) """ test_cnt = report.get('tests_run', 0) if test_cnt == 0: return (False, 'No tests run') passed = report.get('tests_passed', 0) failed = report.get('tests_failed', 0) if 'all' in suite.get('expect_fail', []) or version in suite.get('expect_fail', []): expect_fail = True else: expect_fail = False if expect_fail: if failed == test_cnt: return (True, 'All %d/%d tests failed as expected' % (failed, test_cnt)) else: return (False, '%d/%d tests failed: expected all to fail' % (failed, test_cnt)) else: if failed > 0: return (False, '%d/%d tests passed: expected all to pass' % (passed, test_cnt)) else: return (True, 'All %d/%d tests passed as expected' % (passed, test_cnt)) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Run librdkafka test suit using SASL on a trivupped cluster') parser.add_argument('--conf', type=str, dest='conf', default=None, help='trivup JSON config object (not file)') parser.add_argument('--rdkconf', type=str, dest='rdkconf', default=None, help='trivup JSON config object (not file) for LibrdkafkaTestApp') parser.add_argument('--tests', type=str, dest='tests', default=None, help='Test to run (e.g., "0002")') parser.add_argument('--report', type=str, dest='report', default=None, help='Write test suites report to this filename') parser.add_argument('--debug', action='store_true', dest='debug', default=False, help='Enable trivup debugging') parser.add_argument('versions', type=str, default=None, nargs='*', help='Limit broker versions to these') args = parser.parse_args() conf = dict() rdkconf = dict() if args.conf is not None: conf.update(json.loads(args.conf)) if args.rdkconf is not None: rdkconf.update(json.loads(args.rdkconf)) if args.tests is not None: tests = args.tests.split(',') else: tests = None # Test version,supported mechs + suite matrix versions = list() if len(args.versions): for v in args.versions: versions.append((v, ['PLAIN','GSSAPI'])) else: versions = [('trunk', ['PLAIN','GSSAPI']), ('0.9.0.1', ['GSSAPI']), ('0.8.2.2', [])] sasl_plain_conf = {'sasl_mechanisms': 'PLAIN', 'sasl_users': 'myuser=mypassword'} ssl_sasl_plain_conf = {'sasl_mechanisms': 'PLAIN', 'sasl_users': 'myuser=mypassword', 'security.protocol': 'SSL'} sasl_kerberos_conf = {'sasl_mechanisms': 'GSSAPI', 'sasl_servicename': 'kafka'} suites = [{'name': 'SASL PLAIN', 'conf': sasl_plain_conf, 'expect_fail': ['0.9.0.1', '0.8.2.2']}, {'name': 'PLAINTEXT (no SASL)'}, {'name': 'SSL (no SASL)', 'conf': {'security.protocol': 'SSL'}, 'expect_fail': ['0.8.2.2']}, {'name': 'SASL_SSL PLAIN', 'conf': ssl_sasl_plain_conf, 'expect_fail': ['0.9.0.1', '0.8.2.2']}, {'name': 'SASL PLAIN with wrong username', 'conf': sasl_plain_conf, 'rdkconf': {'sasl_users': 'wrongjoe=mypassword'}, 'expect_fail': ['all']}, {'name': 'SASL Kerberos', 'conf': sasl_kerberos_conf, 'expect_fail': ['0.8.2.2']}] pass_cnt = 0 fail_cnt = 0 for version,supported in versions: if len(args.versions) > 0 and version not in args.versions: print('### Skipping version %s' % version) continue for suite in suites: _conf = conf.copy() _conf.update(suite.get('conf', {})) _rdkconf = _conf.copy() _rdkconf.update(rdkconf) _rdkconf.update(suite.get('rdkconf', {})) if 'version' not in suite: suite['version'] = dict() # Disable SASL broker config if broker version does # not support the selected mechanism mech = suite.get('conf', dict()).get('sasl_mechanisms', None) if mech is not None and mech not in supported: print('# Disabled SASL for broker version %s' % version) _conf.pop('sasl_mechanisms', None) # Run tests print('#### Version %s, suite %s: STARTING' % (version, suite['name'])) report = test_it(version, tests=tests, conf=_conf, rdkconf=_rdkconf, debug=args.debug) # Handle test report report['version'] = version passed,reason = handle_report(report, version, suite) report['PASSED'] = passed report['REASON'] = reason if passed: print('\033[42m#### Version %s, suite %s: PASSED: %s\033[0m' % (version, suite['name'], reason)) pass_cnt += 1 else: print('\033[41m#### Version %s, suite %s: FAILED: %s\033[0m' % (version, suite['name'], reason)) print_test_report_summary('%s @ %s' % \ (suite['name'], version), report) fail_cnt += 1 print('#### Test output: %s/stderr.log' % (report['root_path'])) suite['version'][version] = report # Write test suite report JSON file if args.report is not None: test_suite_report_file = args.report f = open(test_suite_report_file, 'w') else: fd, test_suite_report_file = tempfile.mkstemp(prefix='test_suite_', suffix='.json', dir='.') f = os.fdopen(fd, 'w') full_report = {'suites': suites, 'pass_cnt': pass_cnt, 'fail_cnt': fail_cnt, 'total_cnt': pass_cnt+fail_cnt} f.write(json.dumps(full_report).encode('ascii')) f.close() print('\n\n\n') print_report_summary(full_report) print('#### Full test suites report in: %s' % test_suite_report_file) if pass_cnt == 0 or fail_cnt > 0: sys.exit(1) else: sys.exit(0)