pg_bench_lino.c
application/octet-stream
Filename: pg_bench_lino.c
Type: application/octet-stream
Part: 0
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <math.h>
#include <time.h>
#include <unistd.h>
#include <pwd.h>
#include <getopt.h>
#include <libpq-fe.h>
#define MAX_CONNECTIONS 1000
#define MAX_CHANNELS 1000
#define MIN_PERCENTAGE 0.0001
#define MAX_PERCENTAGE 100.0
typedef struct {
double connections;
double channels;
double tick_ms;
double listen_prob;
double unlisten_prob;
double notify_prob;
double unlisten_all_prob;
int max_ticks;
unsigned int seed; // 0 = use current time (random), non-zero = deterministic
} BenchmarkConfig;
static PGconn *connections[MAX_CONNECTIONS];
static int current_connection_count = 0;
static char channels[MAX_CHANNELS][33]; // 128-bit random hex: 32 chars + null terminator
static int current_channel_count = 0;
static bool listening[MAX_CONNECTIONS][MAX_CHANNELS]; // Tracks LISTEN state for each (connection, channel) pair
// Listening pairs tracking
static int count_listening_pairs = 0;
// Error logging
static FILE *error_log = NULL;
// Notification tracking for correctness checking
typedef struct {
char notification_id[33]; // Random hex ID of the latest NOTIFY sent on this channel
bool pending; // Is there a pending notification awaiting confirmation?
bool expected[MAX_CONNECTIONS]; // Which connections should receive it
bool received[MAX_CONNECTIONS]; // Which connections have received it
time_t sent_time; // When the NOTIFY was sent (for timeout detection)
struct timespec delivery_start; // High-resolution start time for delivery measurement
bool delivery_recorded; // Has the first delivery time been recorded?
int listener_count; // Number of listeners when sent
} PendingNotification;
static PendingNotification pending_notifications[MAX_CHANNELS];
// Correctness error tracking
static int correctness_error_count = 0;
// Timing statistics for each operation type
typedef struct {
unsigned long count; // Number of operations
double min_ms; // Minimum time in milliseconds
double max_ms; // Maximum time in milliseconds
double total_ms; // Total time (for calculating average)
} OperationStats;
typedef enum {
OP_LISTEN = 0,
OP_UNLISTEN = 1,
OP_UNLISTEN_ALL = 2,
OP_NOTIFY = 3,
OP_NOTIFY_DELIVERY = 4,
OP_TYPE_COUNT = 5
} OperationType;
static OperationStats operation_stats[OP_TYPE_COUNT];
const char *operation_names[] = {
"LISTEN",
"UNLISTEN",
"UNLISTEN *",
"NOTIFY",
"NOTIFY delivery"
};
// Histogram for NOTIFY delivery times (fine-grained at low latencies)
#define HISTOGRAM_BUCKETS 15
static const double histogram_bounds[HISTOGRAM_BUCKETS] = {
0.05, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, INFINITY
};
static const char *histogram_labels[HISTOGRAM_BUCKETS] = {
"0-0.05ms",
"0.05-0.1ms",
"0.1-0.15ms",
"0.15-0.2ms",
"0.2-0.3ms",
"0.3-0.4ms",
"0.4-0.5ms",
"0.5-0.75ms",
"0.75-1ms",
"1-2ms",
"2-5ms",
"5-10ms",
"10-20ms",
"20-50ms",
"50+ms"
};
static unsigned long histogram_counts[HISTOGRAM_BUCKETS];
/* ============================================================================
* HELPER FUNCTIONS
* ============================================================================ */
/**
* Update timing statistics for an operation type
*/
void update_operation_stats(OperationType op_type, double elapsed_ms) {
OperationStats *stats = &operation_stats[op_type];
stats->count++;
stats->total_ms += elapsed_ms;
if (stats->count == 1) {
stats->min_ms = elapsed_ms;
stats->max_ms = elapsed_ms;
} else {
if (elapsed_ms < stats->min_ms) {
stats->min_ms = elapsed_ms;
}
if (elapsed_ms > stats->max_ms) {
stats->max_ms = elapsed_ms;
}
}
}
/**
* Log an error from a PostgreSQL query to errors.log
*/
void log_error(int conn_idx, int chan_idx, const char *query, PGresult *res) {
if (!error_log) {
error_log = fopen("errors.log", "a");
if (!error_log) {
return; // Can't log if we can't open the file
}
}
time_t now = time(NULL);
char *timestamp = ctime(&now);
timestamp[strlen(timestamp) - 1] = '\0'; // Remove newline
fprintf(error_log, "[%s] Connection %d, Channel %d\n", timestamp, conn_idx, chan_idx);
fprintf(error_log, "Query: %s\n", query);
fprintf(error_log, "Error: %s\n", PQresultErrorMessage(res));
fprintf(error_log, "---\n");
fflush(error_log);
}
/**
* Perform a Bernoulli trial with given probability.
* Returns true with probability p, false with probability 1-p.
*/
bool bernoulli_trial(double probability) {
return ((double)rand() / RAND_MAX) < probability;
}
/**
* Generate a deterministic 128-bit random hex string (32 characters).
* Uses rand() so it respects the seed for reproducibility.
*/
void generate_random_hex(char *buffer) {
static const char hex_chars[] = "0123456789abcdef";
for (int i = 0; i < 32; i++) {
buffer[i] = hex_chars[rand() % 16];
}
buffer[32] = '\0';
}
/* ============================================================================
* CONNECTION MANAGEMENT FUNCTIONS
* ============================================================================ */
/**
* Build PostgreSQL connection string from environment variables.
* Defaults: localhost:5432, user=$USER, database=$USER, no password
*/
char* get_connection_string() {
static char conninfo[512];
const char *pghost = getenv("PGHOST");
const char *pgport = getenv("PGPORT");
const char *pguser = getenv("PGUSER");
const char *pgpassword = getenv("PGPASSWORD");
// Get current user as default
const char *user = pguser;
if (!user) {
struct passwd *pw = getpwuid(getuid());
user = pw ? pw->pw_name : "postgres";
}
snprintf(conninfo, sizeof(conninfo),
"host=%s port=%s user=%s dbname=%s%s%s",
pghost ? pghost : "localhost",
pgport ? pgport : "5432",
user,
user,
pgpassword ? " password=" : "",
pgpassword ? pgpassword : "");
return conninfo;
}
/**
* Add a new connection to the pool.
* Returns 1 on success, 0 on failure.
*/
int add_connection() {
if (current_connection_count >= MAX_CONNECTIONS) {
return 0;
}
char *conninfo = get_connection_string();
PGconn *conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
PQfinish(conn);
return 0;
}
// Store connection
connections[current_connection_count] = conn;
current_connection_count++;
return 1;
}
/**
* Remove the last connection from the pool.
* Returns 1 on success, 0 if no connections to remove.
*/
int remove_connection() {
if (current_connection_count <= 0) {
return 0;
}
current_connection_count--;
int conn_idx = current_connection_count;
// Clean up listening state for this connection
for (int chan_idx = 0; chan_idx < current_channel_count; chan_idx++) {
if (listening[conn_idx][chan_idx]) {
listening[conn_idx][chan_idx] = false;
count_listening_pairs--;
}
}
PQfinish(connections[conn_idx]);
connections[conn_idx] = NULL;
return 1;
}
/**
* Manage connection pool by adjusting towards target count.
* Grows or shrinks by one connection per call.
*/
void manage_connections(int target) {
if (current_connection_count < target) {
add_connection();
} else if (current_connection_count > target) {
remove_connection();
}
}
/**
* Add a new channel to the pool.
* Generates a random hex ID and stores it as a string.
* Returns 1 on success, 0 on failure.
*/
int add_channel() {
if (current_channel_count >= MAX_CHANNELS) {
return 0;
}
generate_random_hex(channels[current_channel_count]);
current_channel_count++;
return 1;
}
/**
* Remove the last channel from the pool.
* Returns 1 on success, 0 if no channels to remove.
*/
int remove_channel() {
if (current_channel_count <= 0) {
return 0;
}
current_channel_count--;
int chan_idx = current_channel_count;
// Clean up listening state for this channel
for (int conn_idx = 0; conn_idx < current_connection_count; conn_idx++) {
if (listening[conn_idx][chan_idx]) {
listening[conn_idx][chan_idx] = false;
count_listening_pairs--;
}
}
channels[chan_idx][0] = '\0';
// Clear any pending notification for this channel
pending_notifications[chan_idx].pending = false;
return 1;
}
/**
* Manage channel pool by adjusting towards target count.
* Grows or shrinks by one channel per call.
*/
void manage_channels(int target) {
if (current_channel_count < target) {
add_channel();
} else if (current_channel_count > target) {
remove_channel();
}
}
/**
* Execute LISTEN or UNLISTEN on a specific (connection, channel) pair.
* Updates the listening state and logs any errors.
*/
void execute_listen_unlisten(int conn_idx, int chan_idx, bool should_listen) {
char query[100];
const char *cmd = should_listen ? "LISTEN" : "UNLISTEN";
snprintf(query, sizeof(query), "%s \"%s\"", cmd, channels[chan_idx]);
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
PGresult *res = PQexec(connections[conn_idx], query);
clock_gettime(CLOCK_MONOTONIC, &end);
double elapsed_ms = (end.tv_sec - start.tv_sec) * 1000.0 +
(end.tv_nsec - start.tv_nsec) / 1000000.0;
OperationType op_type = should_listen ? OP_LISTEN : OP_UNLISTEN;
update_operation_stats(op_type, elapsed_ms);
ExecStatusType status = PQresultStatus(res);
if (status != PGRES_COMMAND_OK) {
log_error(conn_idx, chan_idx, query, res);
}
// Update state and counter
bool was_listening = listening[conn_idx][chan_idx];
listening[conn_idx][chan_idx] = should_listen;
if (should_listen && !was_listening) {
count_listening_pairs++;
} else if (!should_listen && was_listening) {
count_listening_pairs--;
}
PQclear(res);
}
/**
* Execute UNLISTEN * on a connection to stop listening on all channels.
*/
void execute_unlisten_all(int conn_idx) {
char query[20];
snprintf(query, sizeof(query), "UNLISTEN *");
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
PGresult *res = PQexec(connections[conn_idx], query);
clock_gettime(CLOCK_MONOTONIC, &end);
double elapsed_ms = (end.tv_sec - start.tv_sec) * 1000.0 +
(end.tv_nsec - start.tv_nsec) / 1000000.0;
update_operation_stats(OP_UNLISTEN_ALL, elapsed_ms);
ExecStatusType status = PQresultStatus(res);
if (status != PGRES_COMMAND_OK) {
log_error(conn_idx, -1, query, res);
}
// Clear all listening state for this connection
for (int chan_idx = 0; chan_idx < current_channel_count; chan_idx++) {
if (listening[conn_idx][chan_idx]) {
listening[conn_idx][chan_idx] = false;
count_listening_pairs--;
}
}
PQclear(res);
}
/**
* Execute NOTIFY on a specific (connection, channel) pair.
* Sends a notification with a random hex ID payload and tracks it for correctness checking.
*/
void execute_notify(int conn_idx, int chan_idx) {
// Generate random hex ID for this notification
char id_str[33];
generate_random_hex(id_str);
// Build and execute NOTIFY query
char query[200];
snprintf(query, sizeof(query), "NOTIFY \"%s\", '%s'", channels[chan_idx], id_str);
// Get pending notification tracking structure and record delivery start time
// BEFORE sending the notification
PendingNotification *pn = &pending_notifications[chan_idx];
clock_gettime(CLOCK_MONOTONIC, &pn->delivery_start);
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
PGresult *res = PQexec(connections[conn_idx], query);
clock_gettime(CLOCK_MONOTONIC, &end);
double elapsed_ms = (end.tv_sec - start.tv_sec) * 1000.0 +
(end.tv_nsec - start.tv_nsec) / 1000000.0;
update_operation_stats(OP_NOTIFY, elapsed_ms);
ExecStatusType status = PQresultStatus(res);
if (status != PGRES_COMMAND_OK) {
log_error(conn_idx, chan_idx, query, res);
PQclear(res);
return;
}
PQclear(res);
// Track this notification for correctness checking
strcpy(pn->notification_id, id_str);
pn->pending = true;
pn->sent_time = time(NULL);
pn->delivery_recorded = false;
// Mark which connections should receive this notification and count listeners
pn->listener_count = 0;
for (int i = 0; i < current_connection_count; i++) {
pn->expected[i] = listening[i][chan_idx];
pn->received[i] = false;
if (pn->expected[i]) {
pn->listener_count++;
}
}
}
/**
* Execute random operations based on independent Bernoulli trials.
* Runs 4 independent trials (LISTEN, UNLISTEN, UNLISTEN *, NOTIFY).
* Each successful trial executes its corresponding operation independently.
* May execute 0-4 operations per call depending on trial results.
* Called every tick in the main event loop.
*/
void execute_random_operations(BenchmarkConfig *config) {
if (current_connection_count == 0 || current_channel_count == 0) {
return;
}
double listen_prob = config->listen_prob / 100.0;
double unlisten_prob = config->unlisten_prob / 100.0;
double notify_prob = config->notify_prob / 100.0;
double unlisten_all_prob = config->unlisten_all_prob / 100.0;
// Run 4 independent Bernoulli trials
bool try_listen = bernoulli_trial(listen_prob);
bool try_unlisten = bernoulli_trial(unlisten_prob);
bool try_unlisten_all = bernoulli_trial(unlisten_all_prob);
bool try_notify = bernoulli_trial(notify_prob);
// Execute each successful trial independently
if (try_listen) {
int conn_idx = rand() % current_connection_count;
int chan_idx = rand() % current_channel_count;
execute_listen_unlisten(conn_idx, chan_idx, true);
}
if (try_unlisten) {
int conn_idx = rand() % current_connection_count;
int chan_idx = rand() % current_channel_count;
execute_listen_unlisten(conn_idx, chan_idx, false);
}
if (try_unlisten_all) {
int conn_idx = rand() % current_connection_count;
execute_unlisten_all(conn_idx);
}
if (try_notify) {
// Build list of eligible channels (no pending notification and has listeners)
int eligible_channels[MAX_CHANNELS];
int eligible_count = 0;
for (int chan_idx = 0; chan_idx < current_channel_count; chan_idx++) {
// Check if no pending notification
if (!pending_notifications[chan_idx].pending) {
// Check if has listeners
bool has_listeners = false;
for (int i = 0; i < current_connection_count; i++) {
if (listening[i][chan_idx]) {
has_listeners = true;
break;
}
}
if (has_listeners) {
eligible_channels[eligible_count++] = chan_idx;
}
}
}
// If there are eligible channels, pick one randomly
if (eligible_count > 0) {
int selected_idx = rand() % eligible_count;
int chan_idx = eligible_channels[selected_idx];
int conn_idx = rand() % current_connection_count;
execute_notify(conn_idx, chan_idx);
}
}
}
/**
* Helper function to log correctness errors to errors.log.
*/
void log_correctness_error(const char *error_type, int conn_idx, const char *channel, const char *expected_id, const char *received_id) {
if (!error_log) {
error_log = fopen("errors.log", "a");
if (!error_log) {
return;
}
}
time_t now = time(NULL);
char *timestamp = ctime(&now);
timestamp[strlen(timestamp) - 1] = '\0'; // Remove newline
fprintf(error_log, "[%s] CORRECTNESS ERROR: %s\n", timestamp, error_type);
fprintf(error_log, "Connection: %d\n", conn_idx);
fprintf(error_log, "Channel: %s\n", channel);
if (expected_id) {
fprintf(error_log, "Expected ID: %s\n", expected_id);
}
if (received_id) {
fprintf(error_log, "Received ID: %s\n", received_id);
}
fprintf(error_log, "---\n");
fflush(error_log);
// Increment error counter
correctness_error_count++;
}
/**
* Process incoming notifications from all connections.
* Validates received notifications against expected pending notifications.
* Logs any correctness errors (wrong ID, unexpected channel, unexpected receiver).
*/
void process_notifications() {
for (int conn_idx = 0; conn_idx < current_connection_count; conn_idx++) {
// Consume any incoming data
PQconsumeInput(connections[conn_idx]);
// Process all pending notifications
PGnotify *notify;
while ((notify = PQnotifies(connections[conn_idx])) != NULL) {
// Find the channel index
int chan_idx = -1;
for (int j = 0; j < current_channel_count; j++) {
if (strcmp(channels[j], notify->relname) == 0) {
chan_idx = j;
break;
}
}
// Validate: channel should exist
if (chan_idx == -1) {
log_correctness_error("Unexpected channel", conn_idx, notify->relname, NULL, notify->extra);
PQfreemem(notify);
continue;
}
PendingNotification *pn = &pending_notifications[chan_idx];
// Validate: should have a pending notification for this channel
if (!pn->pending) {
log_correctness_error("No pending notification", conn_idx, notify->relname, NULL, notify->extra);
PQfreemem(notify);
continue;
}
// Validate: ID should match
if (strcmp(pn->notification_id, notify->extra) != 0) {
log_correctness_error("ID mismatch", conn_idx, notify->relname, pn->notification_id, notify->extra);
PQfreemem(notify);
continue;
}
// Validate: this connection should have been expected to receive it
if (!pn->expected[conn_idx]) {
log_correctness_error("Unexpected receiver", conn_idx, notify->relname, pn->notification_id, notify->extra);
PQfreemem(notify);
continue;
}
// Mark as received
pn->received[conn_idx] = true;
// Record delivery time on first receipt
if (!pn->delivery_recorded) {
struct timespec end;
clock_gettime(CLOCK_MONOTONIC, &end);
double elapsed_ms = (end.tv_sec - pn->delivery_start.tv_sec) * 1000.0 +
(end.tv_nsec - pn->delivery_start.tv_nsec) / 1000000.0;
update_operation_stats(OP_NOTIFY_DELIVERY, elapsed_ms);
// Update histogram
for (int bucket = 0; bucket < HISTOGRAM_BUCKETS; bucket++) {
if (elapsed_ms < histogram_bounds[bucket]) {
histogram_counts[bucket]++;
break;
}
}
pn->delivery_recorded = true;
}
// Check if all expected connections have received the notification
bool all_received = true;
for (int i = 0; i < current_connection_count; i++) {
if (pn->expected[i] && !pn->received[i]) {
all_received = false;
break;
}
}
// If all expected connections received it, clear the pending notification
if (all_received) {
pn->pending = false;
}
PQfreemem(notify);
}
}
}
/**
* Check for missing notifications (notifications that were sent but not received after a timeout).
* Logs missing notifications to errors.log and clears stale pending notifications.
*/
void check_for_missing_notifications() {
time_t now = time(NULL);
const int TIMEOUT_SECONDS = 5;
for (int chan_idx = 0; chan_idx < current_channel_count; chan_idx++) {
PendingNotification *pn = &pending_notifications[chan_idx];
if (pn->pending && (now - pn->sent_time) > TIMEOUT_SECONDS) {
// Check which connections haven't received the notification
for (int conn_idx = 0; conn_idx < current_connection_count; conn_idx++) {
if (pn->expected[conn_idx] && !pn->received[conn_idx]) {
log_correctness_error("Missing notification (timeout)", conn_idx, channels[chan_idx], pn->notification_id, NULL);
}
}
// Clear the stale pending notification
pn->pending = false;
}
}
}
/* ============================================================================
* OUTPUT FUNCTIONS
* ============================================================================ */
/**
* Print histogram of NOTIFY delivery times
*/
void print_delivery_histogram() {
printf("========================================================\n");
printf("NOTIFY Delivery Time Distribution:\n");
printf("\n");
// Calculate total for percentages
unsigned long total = 0;
for (int i = 0; i < HISTOGRAM_BUCKETS; i++) {
total += histogram_counts[i];
}
if (total == 0) {
printf("No delivery times recorded.\n\n");
return;
}
// Find max count for scaling bars
unsigned long max_count = 0;
for (int i = 0; i < HISTOGRAM_BUCKETS; i++) {
if (histogram_counts[i] > max_count) {
max_count = histogram_counts[i];
}
}
const int max_bar_width = 50;
// Print histogram (skip empty buckets)
for (int i = 0; i < HISTOGRAM_BUCKETS; i++) {
unsigned long count = histogram_counts[i];
// Skip buckets with zero count
if (count == 0) continue;
double percentage = (count * 100.0) / total;
// Calculate bar width
int bar_width = (int)((count * max_bar_width) / max_count);
if (bar_width == 0) bar_width = 1; // Show at least 1 char for non-zero counts
// Print label
printf("%-12s ", histogram_labels[i]);
// Print bar
for (int j = 0; j < bar_width; j++) {
printf("#");
}
// Print count and percentage
printf(" %lu (%.1f%%)\n", count, percentage);
}
printf("\n");
}
/**
* Print final statistics summary
*/
void print_statistics(BenchmarkConfig *config) {
printf("\n");
printf("========================================================\n");
printf("PostgreSQL LISTEN/NOTIFY Benchmark Results\n");
printf("========================================================\n");
printf("\n");
printf("Configuration:\n");
printf(" Connections: %d\n", (int)config->connections);
printf(" Channels: %d\n", (int)config->channels);
printf(" Tick interval: %.0f ms\n", config->tick_ms);
printf(" LISTEN probability: %.4f%%\n", config->listen_prob);
printf(" UNLISTEN probability: %.4f%%\n", config->unlisten_prob);
printf(" NOTIFY probability: %.4f%%\n", config->notify_prob);
printf(" UNLISTEN * probability: %.4f%%\n", config->unlisten_all_prob);
printf(" Ticks executed: %d\n", config->max_ticks);
printf(" Random seed: %u\n", config->seed);
printf("\n");
printf("Final State:\n");
printf(" Active connections: %d\n", current_connection_count);
printf(" Active channels: %d\n", current_channel_count);
printf(" Listening pairs: %d\n", count_listening_pairs);
printf(" Correctness errors: %d\n", correctness_error_count);
printf("\n");
printf("========================================================\n");
printf("Operation Statistics:\n");
printf("%-15s %10s %12s %12s %12s\n", "Operation", "Count", "Min(ms)", "Avg(ms)", "Max(ms)");
printf("--------------------------------------------------------\n");
for (int i = 0; i < OP_TYPE_COUNT; i++) {
if (operation_stats[i].count > 0) {
double avg_ms = operation_stats[i].total_ms / operation_stats[i].count;
printf("%-15s %10lu %12.3f %12.3f %12.3f\n",
operation_names[i],
operation_stats[i].count,
operation_stats[i].min_ms,
avg_ms,
operation_stats[i].max_ms);
} else {
printf("%-15s %10lu %12s %12s %12s\n",
operation_names[i], 0UL, "-", "-", "-");
}
}
printf("\n");
// Print NOTIFY delivery time histogram
print_delivery_histogram();
if (correctness_error_count > 0) {
printf("WARNING: %d correctness errors detected. See errors.log for details.\n", correctness_error_count);
}
}
/**
* Print usage information
*/
void print_usage(const char *program_name) {
printf("Usage: %s [OPTIONS]\n", program_name);
printf("\n");
printf("PostgreSQL LISTEN/NOTIFY Benchmark Tool (CLI version)\n");
printf("\n");
printf("Options:\n");
printf(" -c, --connections N Number of database connections (default: 1)\n");
printf(" -n, --channels N Number of notification channels (default: 1)\n");
printf(" -t, --tick-ms N Tick interval in milliseconds (default: 1)\n");
printf(" -l, --listen-prob N LISTEN probability %% (default: 0.1, 0 to disable)\n");
printf(" -u, --unlisten-prob N UNLISTEN probability %% (default: 0.05, 0 to disable)\n");
printf(" -p, --notify-prob N NOTIFY probability %% (default: 1.0, 0 to disable)\n");
printf(" -a, --unlisten-all-prob N UNLISTEN * probability %% (default: 0.01, 0 to disable)\n");
printf(" -T, --ticks N Number of ticks to run (REQUIRED)\n");
printf(" -s, --seed N Random seed for reproducibility (default: current time)\n");
printf(" -h, --help Show this help message\n");
printf("\n");
printf("Environment variables:\n");
printf(" PGHOST PostgreSQL host (default: localhost)\n");
printf(" PGPORT PostgreSQL port (default: 5432)\n");
printf(" PGUSER PostgreSQL user (default: current user)\n");
printf(" PGPASSWORD PostgreSQL password (default: none)\n");
printf("\n");
printf("Examples:\n");
printf(" %s -T 1000 # Minimal: 1 connection, 1 channel, 1000 ticks\n", program_name);
printf(" %s -c 10 -n 5 -T 1000\n", program_name);
printf(" %s --connections 100 --channels 10 --ticks 5000 --notify-prob 2.0\n", program_name);
printf(" %s -c 10 -n 5 -T 1000 -s 42 # Reproducible run with seed 42\n", program_name);
printf(" %s -c 10 -n 5 -T 1000 --listen-prob 0 --unlisten-prob 0 # Only NOTIFY\n", program_name);
}
/* ============================================================================
* MAIN PROGRAM
* ============================================================================ */
int main(int argc, char *argv[]) {
BenchmarkConfig config = {
.connections = 1.0,
.channels = 1.0,
.tick_ms = 1.0,
.listen_prob = 0.1,
.unlisten_prob = 0.05,
.notify_prob = 1.0,
.unlisten_all_prob = 0.01,
.max_ticks = -1,
.seed = 0
};
// Command-line option parsing
static struct option long_options[] = {
{"connections", required_argument, 0, 'c'},
{"channels", required_argument, 0, 'n'},
{"tick-ms", required_argument, 0, 't'},
{"listen-prob", required_argument, 0, 'l'},
{"unlisten-prob", required_argument, 0, 'u'},
{"notify-prob", required_argument, 0, 'p'},
{"unlisten-all-prob", required_argument, 0, 'a'},
{"ticks", required_argument, 0, 'T'},
{"seed", required_argument, 0, 's'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0}
};
int opt;
while ((opt = getopt_long(argc, argv, "c:n:t:l:u:p:a:T:s:h", long_options, NULL)) != -1) {
switch (opt) {
case 'c':
config.connections = atof(optarg);
if (config.connections < 0 || config.connections > MAX_CONNECTIONS) {
fprintf(stderr, "Error: connections must be between 0 and %d\n", MAX_CONNECTIONS);
return 1;
}
break;
case 'n':
config.channels = atof(optarg);
if (config.channels < 0 || config.channels > MAX_CHANNELS) {
fprintf(stderr, "Error: channels must be between 0 and %d\n", MAX_CHANNELS);
return 1;
}
break;
case 't':
config.tick_ms = atof(optarg);
if (config.tick_ms < 0) {
fprintf(stderr, "Error: tick-ms must be non-negative\n");
return 1;
}
break;
case 'l':
config.listen_prob = atof(optarg);
if (config.listen_prob < 0 || config.listen_prob > MAX_PERCENTAGE) {
fprintf(stderr, "Error: listen-prob must be between 0 and %.0f\n", MAX_PERCENTAGE);
return 1;
}
break;
case 'u':
config.unlisten_prob = atof(optarg);
if (config.unlisten_prob < 0 || config.unlisten_prob > MAX_PERCENTAGE) {
fprintf(stderr, "Error: unlisten-prob must be between 0 and %.0f\n", MAX_PERCENTAGE);
return 1;
}
break;
case 'p':
config.notify_prob = atof(optarg);
if (config.notify_prob < 0 || config.notify_prob > MAX_PERCENTAGE) {
fprintf(stderr, "Error: notify-prob must be between 0 and %.0f\n", MAX_PERCENTAGE);
return 1;
}
break;
case 'a':
config.unlisten_all_prob = atof(optarg);
if (config.unlisten_all_prob < 0 || config.unlisten_all_prob > MAX_PERCENTAGE) {
fprintf(stderr, "Error: unlisten-all-prob must be between 0 and %.0f\n", MAX_PERCENTAGE);
return 1;
}
break;
case 'T':
config.max_ticks = atoi(optarg);
if (config.max_ticks <= 0) {
fprintf(stderr, "Error: ticks must be a positive integer\n");
return 1;
}
break;
case 's':
config.seed = (unsigned int)atoi(optarg);
break;
case 'h':
print_usage(argv[0]);
return 0;
default:
print_usage(argv[0]);
return 1;
}
}
// Validate required arguments
if (config.max_ticks < 0) {
fprintf(stderr, "Error: --ticks is required\n\n");
print_usage(argv[0]);
return 1;
}
// Initialize random number generator
if (config.seed == 0) {
// No seed provided, use current time for randomness
config.seed = (unsigned int)time(NULL);
srand(config.seed);
} else {
// Use provided seed for reproducible results
srand(config.seed);
}
// Initialize listening state - all pairs start as not listening
memset(listening, 0, sizeof(listening));
// Initialize pending notifications - no pending notifications at start
memset(pending_notifications, 0, sizeof(pending_notifications));
// Initialize operation statistics
memset(operation_stats, 0, sizeof(operation_stats));
// Initialize histogram
memset(histogram_counts, 0, sizeof(histogram_counts));
// Setup connections and channels to target values
int target_connections = (int)config.connections;
int target_channels = (int)config.channels;
printf("Initializing %d connections and %d channels...\n", target_connections, target_channels);
// Setup all connections
while (current_connection_count < target_connections) {
if (!add_connection()) {
fprintf(stderr, "Error: Failed to create connection %d\n", current_connection_count + 1);
goto cleanup;
}
}
// Setup all channels
while (current_channel_count < target_channels) {
if (!add_channel()) {
fprintf(stderr, "Error: Failed to create channel %d\n", current_channel_count + 1);
goto cleanup;
}
}
printf("Initialization complete. Starting benchmark...\n");
// Main benchmark loop
int tick_counter = 0;
for (int tick = 0; tick < config.max_ticks; tick++) {
// Execute random operations
execute_random_operations(&config);
// Process incoming notifications
process_notifications();
// Check for missing notifications every 100 ticks
tick_counter++;
if (tick_counter >= 100) {
check_for_missing_notifications();
tick_counter = 0;
}
// Sleep for tick_ms milliseconds
usleep((int)config.tick_ms * 1000);
}
// Final check for missing notifications
check_for_missing_notifications();
// Print statistics
print_statistics(&config);
cleanup:
// Clean up connections
while (current_connection_count > 0) {
remove_connection();
}
// Clean up channels
while (current_channel_count > 0) {
remove_channel();
}
// Close error log
if (error_log) {
fclose(error_log);
}
return (correctness_error_count > 0) ? 1 : 0;
}