/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #define _POSIX_C_SOURCE 200809L #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "libFuzzingEngine.h" // bool VERBOSE = true; bool VERBOSE = false; // bool ERRORS = true; bool ERRORS = false; #define MAX_SIZE 1024 typedef char str[MAX_SIZE]; typedef struct app_data_t { str address; str container_id; pn_rwbytes_t message_buffer; int message_count; int received; pn_proactor_t *proactor; bool finished; } app_data_t; static const int BATCH = 1000; /* Batch size for unlimited receive */ static int exit_code = 0; static void check_condition(pn_event_t *e, pn_condition_t *cond) { if (VERBOSE) printf("beginning check_condition\n"); if (pn_condition_is_set(cond)) { exit_code = 1; if (VERBOSE || ERRORS) fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), pn_condition_get_name(cond), pn_condition_get_description(cond)); } } static void decode_message(pn_delivery_t *dlv) { static char buffer[MAX_SIZE]; ssize_t len; // try to decode the message body if (pn_delivery_pending(dlv) < MAX_SIZE) { // read in the raw data len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); if (len > 0) { // decode it into a proton message pn_message_t *m = pn_message(); if (PN_OK == pn_message_decode(m, buffer, len)) { pn_string_t *s = pn_string(NULL); pn_inspect(pn_message_body(m), s); printf("%s\n", pn_string_get(s)); pn_free(s); } pn_message_free(m); } } } static void handle(app_data_t *app, pn_event_t *event) { switch (pn_event_type(event)) { case PN_CONNECTION_INIT: { pn_connection_t *c = pn_event_connection(event); pn_connection_set_container(c, app->container_id); pn_connection_open(c); pn_session_t *s = pn_session(c); pn_session_open(s); pn_link_t *l = pn_receiver(s, "my_receiver"); pn_terminus_set_address(pn_link_source(l), app->address); pn_link_open(l); /* cannot receive without granting credit: */ pn_link_flow(l, app->message_count ? app->message_count : BATCH); } break; case PN_DELIVERY: { /* A message has been received */ pn_link_t *link = NULL; pn_delivery_t *dlv = pn_event_delivery(event); if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { link = pn_delivery_link(dlv); decode_message(dlv); /* Accept the delivery */ pn_delivery_update(dlv, PN_ACCEPTED); /* done with the delivery, move to the next and free it */ pn_link_advance(link); pn_delivery_settle(dlv); /* dlv is now freed */ if (app->message_count == 0) { /* receive forever - see if more credit is needed */ if (pn_link_credit(link) < BATCH / 2) { /* Grant enough credit to bring it up to BATCH: */ pn_link_flow(link, BATCH - pn_link_credit(link)); } } else if (++app->received >= app->message_count) { /* done receiving, close the endpoints */ printf("%d messages received\n", app->received); pn_session_t *ssn = pn_link_session(link); pn_link_close(link); pn_session_close(ssn); pn_connection_close(pn_session_connection(ssn)); } } } break; case PN_TRANSPORT_ERROR: check_condition(event, pn_transport_condition(pn_event_transport(event))); pn_connection_close(pn_event_connection(event)); break; case PN_CONNECTION_REMOTE_CLOSE: check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); pn_connection_close(pn_event_connection(event)); break; case PN_SESSION_REMOTE_CLOSE: check_condition(event, pn_session_remote_condition(pn_event_session(event))); pn_connection_close(pn_event_connection(event)); break; case PN_LINK_REMOTE_CLOSE: case PN_LINK_REMOTE_DETACH: check_condition(event, pn_link_remote_condition(pn_event_link(event))); pn_connection_close(pn_event_connection(event)); break; case PN_PROACTOR_INACTIVE: app->finished = true; break; default: break; } } double now(void) { struct timespec spec; if (clock_gettime(CLOCK_MONOTONIC, &spec) != 0) { perror("clock_gettime"); exit(errno); } return (double)spec.tv_sec + (double)spec.tv_nsec / 1000000.0; } int sut(void) { /* Default values for application and connection. */ app_data_t app = {{0}}; app.message_count = 2; snprintf(app.container_id, sizeof(app.container_id), "%s:%d", "fuzz_proactor_recv", getpid()); const char *address = "127.0.0.1:amqp"; strncpy(app.address, "jms.queue.example", sizeof(app.address)); if (VERBOSE) printf("before proactor\n"); /* Create the proactor and connect */ app.proactor = pn_proactor(); pn_proactor_connect(app.proactor, pn_connection(), address); if (VERBOSE) printf("before loop\n"); double thence = now(); do { if (VERBOSE) printf("before set proactor timeout\n"); pn_proactor_set_timeout(app.proactor, 100); if (VERBOSE) printf("before proactor wait\n"); pn_event_batch_t *events = pn_proactor_wait(app.proactor); pn_event_t *e; if (VERBOSE) printf("before proactor next batch\n"); while ((e = pn_event_batch_next(events))) { handle(&app, e); } pn_proactor_done(app.proactor, events); if (VERBOSE) printf("before reloop\n"); double deltat = now() - thence; if (VERBOSE) printf("deltat %f", deltat); if (deltat > 1) { app.finished = true; } } while (!app.finished); if (VERBOSE) printf("after loop\n"); pn_proactor_free(app.proactor); free(app.message_buffer.start); return exit_code; } void serve_data(const uint8_t *Data, size_t Size) { int sockfd; if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("socket"); _Exit(errno); } int reuseaddr = 1; if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) { perror("setsockopt"); _Exit(errno); } struct sockaddr_in self; memset(&self, 0, sizeof(self)); self.sin_family = AF_INET; self.sin_port = htons(5672); self.sin_addr.s_addr = INADDR_ANY; if (bind(sockfd, (struct sockaddr *)&self, sizeof(self)) != 0) { perror("bind"); // Lets unblock the old child that listens by starting new client to read // from it. It breaks the fuzzing somewhat, but it is better to mess up one // than many inputs. if (VERBOSE) printf("unblocking old bound child\n"); kill(getppid(), SIGUSR1); _Exit(errno); } if (VERBOSE) printf("bound\n"); if (listen(sockfd, 1) != 0) { perror("listen"); _Exit(errno); } if (VERBOSE) printf("listened, lets run sut\n"); kill(getppid(), SIGUSR1); struct sockaddr_in client_addr; socklen_t addrlen = sizeof(client_addr); int clientfd = accept(sockfd, (struct sockaddr *)&client_addr, &addrlen); if (VERBOSE) printf("%s:%d connected\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); if (VERBOSE) printf("will send\n"); send(clientfd, Data, Size, 0); // sleep(1); close(clientfd); close(sockfd); if (VERBOSE) printf("done serving\n"); } void run_sut(int s) { if (VERBOSE) printf("running sut\n"); sut(); if (VERBOSE) printf("finished running sut\n"); } void signal_callback_handler(int signum) { if (VERBOSE) printf("Caught signal SIGPIPE %d\n", signum); } bool DoInitialization(void) { struct sigaction sa; sa.sa_handler = run_sut; sigemptyset(&sa.sa_mask); sa.sa_flags = SA_RESTART; /* Restart functions if interrupted by handler */ if (sigaction(SIGUSR1, &sa, NULL) == -1) { perror("sigaction"); exit(2); } sa.sa_handler = signal_callback_handler; if (sigaction(SIGPIPE, &sa, NULL) == -1) { perror("sigaction"); exit(2); } return true; } int LLVMFuzzerInitialize(int *argc, char ***argv) { DoInitialization(); return 0; } int prev_pid = 0; int LLVMFuzzerTestOneInput(const uint8_t *Data, size_t Size) { // sometimes, esp. with AFL, but libFuzz too, // the old socket is still bound for new run and // it skips all new runs... if (prev_pid != 0) { kill(SIGKILL, prev_pid); } pid_t pid = fork(); if (pid < 0) { perror("fork"); exit(errno); } if (pid == 0) { // child serve_data(Data, Size); _Exit(0); } else { // parent prev_pid = pid; if (VERBOSE) printf("waiting for child\n"); siginfo_t status; waitid(P_PID, pid, &status, WEXITED); if (VERBOSE) printf("finished waiting for child\n"); } return 0; }