import json import os import sys import email.parser from io import StringIO import base64 def lambda_handler(event, context): auth = event["headers"].get("authorization", "").replace("Bearer", "").strip() if os.getenv("RUNNER_SECRET") != auth: return { 'statusCode': 401, 'body': json.dumps('Incorrect runner secret') } if event["rawPath"] == "/execute": body = base64.b64decode(event["body"]) if event["isBase64Encoded"] else str.encode(event["body"]) print("the body", body) body = "Content-Type: " + event["headers"]["content-type"] + "\n" + body.decode() msg = email.message_from_string(body) parts = { part.get_param('name', header='Content-Disposition'): part.get_payload(decode=True) for part in msg.get_payload() } global grafana_event grafana_event = json.loads(parts["event"]) entrypoint = json.loads(parts["metadata"])["entrypoint"] print("entrypoint", entrypoint) old_stdout = sys.stdout redirected_output = sys.stdout = StringIO() exec(parts[entrypoint], globals()) sys.stdout = old_stdout print(redirected_output.getvalue()) return { 'statusCode': 200, 'body': "Output:\n" + redirected_output.getvalue() } elif event["rawPath"] == "/challenge": return { 'statusCode': 200, 'body': 'OK' } else: return { 'statusCode': 400, 'body': 'Invalid endpoint: ' + event["rawPath"] }