/* sump.c - SUMP Pump(TM) SMP/CMP parallel data pump library. * SUMP Pump is a trademark of Ordinal Technology Corp * * $Revision: 124 $ * * Copyright (C) 2010 - 2011, Ordinal Technology Corp, http://www.ordinal.com * * This program is free software; you can redistribute it and/or * modify it under the terms of Version 2 of the GNU General Public * License as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * Linking SUMP Pump statically or dynamically with other modules is * making a combined work based on SUMP Pump. Thus, the terms and * conditions of the GNU General Public License v.2 cover the whole * combination. * * In addition, as a special exception, the copyright holders of SUMP Pump * give you permission to combine SUMP Pump program with free software * programs or libraries that are released under the GNU LGPL and with * independent modules that communicate with SUMP Pump solely through * Ordinal Technology Corp's Nsort Subroutine Library interface as defined * in the Nsort User Guide, http://www.ordinal.com/NsortUserGuide.pdf. * You may copy and distribute such a system following the terms of the * GNU GPL for SUMP Pump and the licenses of the other code concerned, * provided that you include the source code of that other code when and * as the GNU GPL requires distribution of source code. * * Note that people who make modified versions of SUMP Pump are not * obligated to grant this special exception for their modified * versions; it is their choice whether to do so. The GNU General * Public License gives permission to release a modified version without * this exception; this exception also makes it possible to release a * modified version which carries forward this exception. * * For more information on SUMP Pump, see: * http://www.ordinal.com/sump.html * http://code.google.com/p/sump-pump/ */ # define AIO_CAPABLE #if !defined(_WIN32) # define _GNU_SOURCE # include # include # include # include # include # include # include # include # include # include # include # if !defined(__CYGWIN32__) # include # include # include # endif # define PTFlld "lld" # define PTFllu "llu" # define PTFllx "llx" # define ERRNO errno #else /* now defined(_WIN32) */ # define PTFlld "I64d" # define PTFllu "I64u" # define PTFllx "I64x" # define ERRNO GetLastError() #endif /* !defined(_WIN32) */ #include "sump.h" #include "sumpversion.h" #include #include #include #include #include #include #include #include #if defined(SUMP_PUMP_NO_SORT) /* define some nsort typedefs to minimize the number of #if's in this file */ typedef unsigned nsort_t; /* nsort context identifier */ typedef int nsort_msg_t; /* return status & error message numbers */ #else /* use Nsort include files */ # include "nsort.h" # include "nsorterrno.h" #endif /* values for the flags sump pump structure member */ #define SP_UNICODE 0x0001 /* not yet supported */ #define SP_UTF_8 0x0002 /* input records are utf-8 * characters */ #define SP_ASCII SP_UTF_8 #define SP_FIXED 0x0003 /* input records are a fixed * number of bytes */ #define SP_WHOLE_BUF 0x0004 /* there are no input records, * instead there is only an * input buffer */ #define SP_REC_TYPE_MASK 0x0007 #define SP_SORT 0x0008 #define SP_GROUP_BY 0x0010 #define SP_EXEC 0x0020 #define REC_TYPE(sp) ((sp)->flags & SP_REC_TYPE_MASK) #define TRUE 1 #define FALSE 0 /* sort_state values */ #define SORT_INPUT 1 #define SORT_OUTPUT 2 #define SORT_DONE 3 #define ERROR_BUF_SIZE 500 /* size of error buffer */ #define DEFAULT_BUFFERED_TRANSFER_SIZE (1024 * 1024) #define DEFAULT_PIPE_TRANSFER_SIZE 8192 /* state structure for a sump pump instance */ struct sump { unsigned flags; /* caller-defined bit flags, * see sump.h */ sp_pump_t pump_func; /* caller-defined pump function that * is executed in parallel by the * sump pump */ void *pump_arg; /* caller-defined arg to pump func */ unsigned num_tasks; /* number of pump tasks */ unsigned num_threads; /* number of threads executing the * pump func */ unsigned num_in_bufs; /* number of input buffers */ unsigned num_outputs; /* number of sump pump output channels*/ ssize_t in_buf_size; /* input buffer size in bytes */ struct sump_out *out; /* array of output structures, one for * each output */ void *delimiter; /* record delimiter, for text input */ size_t rec_size; /* record size */ pthread_mutex_t sump_mtx; /* mutex for sump pump infrastructure */ pthread_mutex_t sp_mtx; /* mutex for pump funcs to use * via sp_mutex_lock() and * sp_mutex_unlock() calls */ pthread_cond_t in_buf_readable_cond; /* input buffer available for * reading by pump funcs */ pthread_cond_t in_buf_done_cond; /* an input buffer has been * completely read by all potential * pump threads and can be reused */ pthread_cond_t task_avail_cond; /* a task is available for the * taking by a sump pump thread */ pthread_cond_t task_drained_cond; /* a task has been completely * executed and its output has * been drained (read) for its * output buffer(s) */ pthread_cond_t task_output_ready_cond; /* a task's output ready to * be read */ pthread_cond_t task_output_empty_cond; /* a task's output buffer has * been read and is empty */ size_t in_buf_current_bytes; /* bytes in current input buf */ /* number of bytes at end of prev input buffer containing a partial rec */ size_t prev_in_buf_ending_rec_partial_bytes; pthread_t *thread; /* array of sump pump threads */ uint64_t cnt_in_buf_readable; /* number of input buffers that * have been filled with input * data and are available for * reading by sump pump threads * executing pump functions */ uint64_t cnt_in_buf_done; /* number of input buffers * that have been read by all * their readers */ uint64_t cnt_task_init; /* number of tasks initialized and * available for the taking by any * sump pump thread */ uint64_t cnt_task_begun; /* number of tasks allocated/taken * and begun by sump pump threads */ uint64_t cnt_task_drained; /* number of tasks that have * been completed and had all * their output buffer(s) * completely read/drained. */ uint64_t cnt_task_done; /* number of done tasks whose * actual ending position has been * verified to be the same as their * expected ending */ struct sp_task *task; /* array of sump pump tasks */ struct in_buf *in_buf; /* array of sump pump input buffers */ nsort_t nsort_ctx; /* used only if this is a sort */ char *error_buf; /* buf to hold error msg */ size_t error_buf_size; /* buf to hold error msg */ int error_code; /* pump func generated error code */ unsigned sort_error; /* sort error code */ char *sort_temp_buf; /* sort temporary buf */ size_t sort_temp_buf_size; /* size of sort temporary buf */ size_t sort_temp_buf_bytes; /* bytes of data in temp buf */ char input_eof; /* sp_write_input() called with * size <= 0 */ char broken_input; /* sp_write_input() called with * a negative size */ char sort_state; /* only used for sorting */ char match_keys; /* only used for sorting - indicates * -match has been specified */ char wait_done; /* sp_wait already called for this sp*/ char in_file_alloc; /* in_file string was malloc()'d and * should be free()'d */ char *in_file; /* input file str or NULL if none */ struct sp_file *in_file_sp; /* input file of sump pump */ struct exec_state *ex_state; /* used when internal pump func invokes * an external executable program. * one state per sump pump thread */ char **exec_argv; /* exec process command line */ }; /* struct for an output of a task */ struct task_out { char *buf; /* output buffer where map task should * write its results */ size_t size; /* capacity of output buffer */ size_t bytes_copied; /* number of bytes written so far into * output buffer */ char stalled; /* the map thread handling this task is * stalled waiting for the writer thread to * empty its full buf */ }; #define INVALID_FD (-1) #define PIPE_BUF_SIZE 4096 /* macro to allow the stderr of external programs performing pump functions * to be the second output of the sump pump. The initial implementation of * external programs for pump functions did gather stderr as the second * output; but it seemed problematic because when an eternal program failed * and wrote an error message to its standard error, the sump pump thread * that reads the pump func input and writes it to the standard input of * external program would notice that external program had terminated * abnormally and would set the error code for the sump pump; then the * thread reading the stderr of the external program would notice the sump * pump error code and exit before it would read the error message in the * stderr; thus the error message never made it through sump pump. * #define SUMP_PIPE_STDERR */ /* per-pipe struct used when pump funcs call a separate program */ struct std_pipe { struct exec_state *ex; int perrno; #if defined(win_nt) HANDLE rd_h; /* read handle */ HANDLE wr_h; /* write handle */ #else int rd_fd; /* read file descriptor */ int wr_fd; /* write file descriptor */ #endif }; /* per-thread struct used when pump funcs call a separate program */ struct exec_state { sp_task_t t; struct std_pipe in; char in_buf[PIPE_BUF_SIZE]; struct std_pipe out; char out_buf[PIPE_BUF_SIZE]; #if defined(SUMP_PIPE_STDERR) struct std_pipe err; char err_buf[PIPE_BUF_SIZE]; #endif }; /* struct for a sump pump task */ struct sp_task { struct sump *sp; /* the "host" sp_t of this task */ uint64_t task_number; /* task number */ int thread_index; /* id of thread performing this task */ char *in_buf; /* input buffer */ size_t in_buf_bytes; /* number of bytes written into in_buf * by the reader thread. these bytes * will be read out by map task */ char *rec_buf; /* buf to hold rec returned by pf_get_rec() */ size_t rec_buf_size; /* size of rec_buf */ char *curr_rec; /* pointer to the current record */ char *temp_buf; /* temp buf to help with printf */ size_t temp_buf_size; /* size of the temp buf */ char *error_buf; /* error message posted by sp_error() call */ size_t error_buf_size; /* size of the error message buf */ int error_code; /* pump func generated error code */ int sort_error; /* nsort error code */ uint64_t curr_in_buf_index; /* current input buffer index */ char *begin_rec; /* pointer to the beginning record */ uint64_t begin_in_buf_index;/* beginning input buffer index */ /* the following 2 members are set by the thread calling sp_write_input() */ uint64_t expected_end_index; /* expected end in buf index */ int expected_end_offset; /* expected end in buf offset */ char first_group_rec; /* next record read will be the first * record for its record group */ char first_in_buf; /* this task is still reading its * first input buffer */ char input_eof; /* boolean: this task is done reading * its input */ char output_eof; /* boolean: thread performing task is * done writing its output to its output * buffer, but we may still need to wait * until the output buffer has been read * before the task is done */ int outs_drained; /* number of outputs for this task that * have been completely drained (read) */ struct task_out *out; /* array of task outputs */ }; /* struct for a sump pump input buffer */ typedef struct in_buf { char *in_buf; /* input buffer */ size_t in_buf_bytes; /* number of bytes written into in_buf * by the reader thread. these bytes * will be read out by map task */ size_t in_buf_size; /* size of the in_buf */ size_t alloc_size; /* allocation size of the in_buf */ unsigned num_readers; /* number of threads performing * tasks that read this buf */ unsigned num_readers_done;/* number of reader threads that are * done with this buffer */ } in_buf_t; /* struct for a link (copy thread) between an output of one sump pump and * the input of another. */ struct sp_link { struct sump *out_sp; /* sp_t we are reading from */ unsigned out_index; /* output index of read sp_t */ struct sump *in_sp; /* sp_t we are writing to */ size_t buf_size; /* buf size */ char *buf; /* temp buf for transfering data */ pthread_t thread; /* thread executing link_main() */ int error_code; /* error code */ }; /* struct for a file reader or writer thread */ struct sp_file { char *fname; /* file name */ pthread_t thread; /* thread executing either file_reader() or * file_writer() */ sp_t sp; /* sump pump this file */ int mode; /* file access mode */ #if defined(win_nt) HANDLE fd; /* file handle */ #else int fd; /* file descriptor */ #endif char wait_done; /* sp_wait already called for this sp_file */ int out_index; /* sump pump output index (if relevant) */ int aio_count; /* the max and target number of async i/o's */ size_t transfer_size; /* read or write request size */ int error_code; /* error code */ int can_seek; /* if true, then direct/async-capable file */ int is_std; /* file is either stdin, stdout or stderr */ }; /* file access modes */ #define MODE_UNSPECIFIED 0 /* no access mode has been specified */ #define MODE_BUFFERED 1 /* use standard read() or write() calls */ #define MODE_DIRECT 2 /* direct and asynchronous r/w requests */ /* struct for a sump pump output */ struct sump_out { size_t buf_size; /* size of each task's corresponding * output buffer for this output */ int size_specified; /* boolean: non-zero if out buf size * has been set */ double buf_size_mult;/* factor increase over input buf size */ size_t partial_bytes_copied; /* the number of bytes copied * from the task output buffer * currently being read from */ char file_alloc; /* file name was malloc()'d and * should be free()'d */ char *file; /* output file str or NULL if none */ struct sp_file *file_sp; /* output file for this sump pump out */ uint64_t cnt_task_drained; /* number of tasks that have * been completed and their * output buffer for this * particular output has been * completely read */ }; #if defined(AIO_CAPABLE) /* sump aio struct */ struct sump_aio { uint64_t buf_index; /* sump pump buffer index */ size_t buf_offset; /* beginning io offset within buffer */ char last_buf_io; /* boolean indicating last io for buf*/ int64_t file_offset; /* file offset */ size_t nbytes; /* request size */ struct aiocb aio; }; #endif /* global sump pump mutex */ static pthread_mutex_t Global_lock = PTHREAD_MUTEX_INITIALIZER; static sp_t Global_external_sp; static int Global_external_count; /* file descriptor for /dev/zero */ static int Zero_fd; /* default size for sp_write_input() and sp_read_output() transfers for * regression testing of those interfaces. */ static size_t Default_rw_test_size; /* default file access mode */ static int Default_file_mode = MODE_DIRECT; /* die - quit program due to a fatal sump pump infrastructure error. */ static void die(char *fmt, ...) { va_list ap; fprintf(stderr, "sump pump fatal error: "); va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); exit(1); } #define PAGE_SIZE page_size() /* page_size - internal routine to get the system page size. */ static int page_size() { static int size; if (size == 0) { #if defined(win_nt) SYSTEM_INFO si; GetSystemInfo(&si); size = si.dwPageSize; #else size = getpagesize(); #endif } return (size); } /* sp_get_time_us - return elapsed time in microseconds. * * The caller can discard the upper 32 bits *only* when timing intervals * less than ~4000 seconds = 1 hour 11 minutes. */ static uint64_t sp_get_time_us(void) { struct timeval time; static uint64_t begin; if (begin == 0) /* if first time */ { if (gettimeofday(&time, NULL) < 0) die("gettimeofday() failure\n"); begin = (time.tv_sec * (uint64_t)1000 * 1000) + time.tv_usec; } if (gettimeofday(&time, NULL) < 0) die("gettimeofday() failure\n"); return (time.tv_sec * (uint64_t)1000 * 1000) + time.tv_usec - begin; } static FILE *TraceFp; #define TRACE if (TraceFp != NULL) trace /* trace - print a trace message */ static void trace(const char *fmt, ...) { va_list ap; uint64_t diff = sp_get_time_us(); int seconds = (int)(diff / 1000000); int fractions = (int)(diff % 1000000); fprintf(TraceFp, "%2d.%06d: ", seconds, fractions); va_start(ap, fmt); vfprintf(TraceFp, fmt, ap); va_end(ap); fflush(TraceFp); } /* sp_get_version - get the subversion version for sump pump * * Returns: a string containing the subversion version. The string should * NOT be free()'d. */ const char *sp_get_version(void) { return (sp_version); } /* sp_get_id - get the subversion id keyword substitution for sump pump * * Returns: a string containing the subversion id keyword. The string should * NOT be free()'d. */ const char *sp_get_id(void) { return ("$Id$"); } /* get_error_msg - place a system error message in the provided buffer */ static char *get_error_msg(int error, char *err_buf, size_t err_buf_size) { #if defined(win_nt) char *lpMsgBuf; char *eol; if (error == 0) /* if no error specified, get last error */ error = GetLastError(); FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, GetModuleHandle(NULL), error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR) &lpMsgBuf, 0, NULL); eol = lpMsgBuf + strlen(lpMsgBuf); /* remove trailing \r\n so (%d) appears on the same line */ if (eol > lpMsgBuf && eol[-1] == '\n' && eol[-2] == '\r') eol[-2] = '\0'; _snprintf(err_buf, err_buf_size - 1, "%s (%d)", lpMsgBuf, error); LocalFree(lpMsgBuf); return (err_buf); #else if (error == 0) /* if no error specified, get last error */ error = errno; return (strerror_r(error, err_buf, err_buf_size)); #endif } /* start_error - raise an error and set error message during sp_start() * but before the sump pump threads are created. */ static int start_error(sp_t sp, const char *fmt, ...) { va_list ap; int ret; if (sp->error_code != 0) /* if prior error */ return sp->error_code; /* ignore this one */ sp->error_code = SP_START_ERROR; va_start(ap, fmt); ret = vsnprintf(sp->error_buf, sp->error_buf_size, fmt, ap); va_end(ap); #if defined(win_nt) if (ret == -1) /* non-standard vsnprintf overflow indicator on Windows */ { va_start(ap, fmt); ret = _vscprintf(fmt, ap); va_end(ap); } #endif if ((size_t)ret >= sp->error_buf_size) { if (sp->error_buf_size != 0) free(sp->error_buf); sp->error_buf_size = (size_t)ret + 1; sp->error_buf = (char *)malloc(sp->error_buf_size); va_start(ap, fmt); vsnprintf(sp->error_buf, sp->error_buf_size, fmt, ap); va_end(ap); } return SP_START_ERROR; } /* broadcast_all_conds - internal routine to broadcast all sump pump conditions * The sump_mtx should already be locked. */ static void broadcast_all_conds(sp_t sp) { pthread_cond_broadcast(&sp->in_buf_readable_cond); /*multiple sp threads*/ pthread_cond_broadcast(&sp->in_buf_done_cond);/* sp_write_input() caller */ pthread_cond_broadcast(&sp->task_avail_cond); /* multiple sp threads */ pthread_cond_broadcast(&sp->task_drained_cond);/* sp_write_input() caller*/ pthread_cond_broadcast(&sp->task_output_ready_cond); /* mult sp threads */ pthread_cond_broadcast(&sp->task_output_empty_cond); /* mult sp threads */ } /* sp_raise_error - raise an error for a sump pump */ void sp_raise_error(sp_t sp, int error_code, const char *fmt, ...) { va_list ap; int ret; pthread_mutex_lock(&sp->sump_mtx); if (sp->error_code != 0) /* if prior error */ { pthread_mutex_unlock(&sp->sump_mtx); return; /* ignore this one. let prior error stand */ } sp->error_code = error_code; va_start(ap, fmt); ret = vsnprintf(sp->error_buf, sp->error_buf_size, fmt, ap); va_end(ap); #if defined(win_nt) if (ret == -1) /* non-standard vsnprintf overflow indicator on Windows */ { va_start(ap, fmt); ret = _vscprintf(fmt, ap); va_end(ap); } #endif if ((size_t)ret >= sp->error_buf_size) { if (sp->error_buf_size != 0) free(sp->error_buf); sp->error_buf_size = (size_t)ret + 1; sp->error_buf = (char *)malloc(sp->error_buf_size); va_start(ap, fmt); vsnprintf(sp->error_buf, sp->error_buf_size, fmt, ap); va_end(ap); } /* Wake up all possible waiting threads for the sump pump. * Some of the below pthread_cond_broadcasts could just be signals. * But since error handling isn't a performance critical operation, * signals are used instead. */ broadcast_all_conds(sp); pthread_mutex_unlock(&sp->sump_mtx); } #if defined(win_nt) # include "sump_win.c" #else /* init_zero_fd - initialize Zero_fd, the file descriptor for /dev/zero. */ static void init_zero_fd() { pthread_mutex_lock(&Global_lock); if (Zero_fd <= 0) Zero_fd = open("/dev/zero", O_RDWR); pthread_mutex_unlock(&Global_lock); return; } #endif #if !defined(SUMP_PUMP_NO_SORT) /* function pointers to nsort library entry points. These are * non-NULL if the nsort library is linked in. */ nsort_msg_t (*Nsort_define)(const char *def, unsigned options, nsort_error_callback_t *callbacks, nsort_t *ctxp); nsort_msg_t (*Nsort_release_recs)(void *buf, size_t size, nsort_t *ctxp); nsort_msg_t (*Nsort_release_end)(nsort_t *ctxp); nsort_msg_t (*Nsort_return_recs)(void *buf, size_t *size, nsort_t *ctxp); nsort_msg_t (*Nsort_end)(nsort_t *ctxp); const char *(*Nsort_get_stats)(nsort_t *ctxp); char *(*Nsort_message)(nsort_t *ctxp); char *(*Nsort_version)(void); typedef nsort_msg_t (*declare_function_t)(char *name, nsort_compare_t func, void *arg); typedef nsort_msg_t (*define_t)(const char *, unsigned, nsort_callback_t *, nsort_t *ctxp); typedef nsort_msg_t (*merge_define_t)(const char *def, unsigned options, nsort_error_callback_t *callbacks, int merge_width, nsort_merge_callback_t *merge_input, nsort_t *ctxp); typedef nsort_msg_t (*release_recs_t)(void *buf, size_t size, nsort_t *ctxp); typedef nsort_msg_t (*release_end_t)(nsort_t *ctxp); typedef nsort_msg_t (*return_recs_t)(void *buf, size_t *size, nsort_t *ctxp); typedef nsort_msg_t (*print_stats_t)(nsort_t *ctxp, FILE *fp); typedef const char *(*get_stats_t)(nsort_t *ctxp); typedef char *(*message_t)(nsort_t *ctxp); typedef nsort_msg_t (*end_t)(nsort_t *ctxp); typedef char *(*version_t)(void); /* get_nsort_syms - internal routine to dynamically link to nsort library */ static int get_nsort_syms() { # if defined(win_nt) # define dlsym(handle, name) GetProcAddress((handle), (name)) HANDLE syms; if ((syms = LoadLibrary("libnsort.dll")) == NULL) return (-1); # else void *syms; if ((syms = dlopen("libnsort.so", RTLD_GLOBAL | RTLD_LAZY)) == NULL) return (-1); # endif if ((Nsort_define = (define_t)dlsym(syms, "nsort_define")) == NULL) return (-2); if ((Nsort_release_recs = (release_recs_t)dlsym(syms, "nsort_release_recs")) == NULL) return (-2); if ((Nsort_release_end = (release_end_t)dlsym(syms, "nsort_release_end")) == NULL) return (-2); if ((Nsort_return_recs = (return_recs_t)dlsym(syms, "nsort_return_recs")) == NULL) return (-2); if ((Nsort_end = (end_t)dlsym(syms, "nsort_end")) == NULL) return (-2); if ((Nsort_get_stats = (get_stats_t)dlsym(syms, "nsort_get_stats")) == NULL) return (-2); if ((Nsort_message = (message_t)dlsym(syms, "nsort_message")) == NULL) return (-2); if ((Nsort_version = (version_t)dlsym(syms, "nsort_version")) == NULL) return (-2); return (0); } /* link_in_nsort - internal routine to, if not already done, dynamically * link in the nsort library. */ static int link_in_nsort() { int ret; # if !defined(win_nt) pthread_mutex_lock(&Global_lock); # endif ret = Nsort_define == NULL ? get_nsort_syms() : 0; # if !defined(win_nt) pthread_mutex_unlock(&Global_lock); # endif return (ret); } /* post_nsort_error - internal routine to post an error received from nsort. */ static void post_nsort_error(sp_t sp, unsigned ret) { pthread_mutex_lock(&sp->sump_mtx); if (sp->error_code == SP_OK) /* if no other error yet, this is the one */ { char *msg = (*Nsort_message)(&sp->nsort_ctx); sp->error_code = SP_SORT_EXEC_ERROR; sp->sort_error = ret; if (msg == NULL) msg = "No Nsort error message"; strncpy(sp->error_buf, msg, sp->error_buf_size); sp->error_buf[sp->error_buf_size - 1] = '\0'; /* handle overflow */ } sp->sort_error = ret; sp->sort_state = SORT_DONE; pthread_cond_broadcast(&sp->task_output_ready_cond); pthread_mutex_unlock(&sp->sump_mtx); } #define STAT_DRCTV " -stat" /* sp_start_sort - start an nsort instance with a sump pump wrapper so * that its input or output can be assigned to a file or * linked to another sump pump. For instance, the sort * input or output can be linked to sump pump performing * record pumping such as "map" on input and "reduce" * for output. * Parameters: * sp - Pointer to where to return newly allocated sp_t * identifier that will be used in as the first * argument to all subsequent sp_*() calls. * def - Nsort sort definition string. Besides the Nsort * commands listed in the Nsort User Guide, the following * directives are also recognized: * -match[=%d] Each output record will be preceded by a * single byte that indicates the * specified number of keys in this record * are the same as in the previous record. * If no key number is specified, all keys * are examined for a match condition. * * Returns: SP_OK or a sump pump error code */ int sp_start_sort(sp_t *caller_sp, char *def_fmt, ...) { int ret; char *def_plus = NULL; sp_t sp; size_t def_len; char *def; char thread_drctv[30]; unsigned char *p; # if defined(win_nt) SYSTEM_INFO si; # endif *caller_sp = NULL; /* assume the worst for now */ sp = (sp_t)calloc(1, sizeof(struct sump)); if (sp == NULL) return (SP_MEM_ALLOC_ERROR); sp->error_buf_size = ERROR_BUF_SIZE; sp->error_buf = (char *)calloc(1, sp->error_buf_size); if (sp->error_buf == NULL) return (SP_MEM_ALLOC_ERROR); *caller_sp = sp; /* allow access to error_buf even if failure */ /* fill in default parameters */ # if defined(win_nt) GetSystemInfo(&si); sp->num_threads = si.dwNumberOfProcessors; # else sp->num_threads = sysconf(_SC_NPROCESSORS_ONLN); # endif sprintf(thread_drctv, "-threads=%d ", sp->num_threads); if (sp->num_outputs > 32) sp->num_outputs = 32; sp->num_outputs = 1; sp->out = (struct sump_out *)calloc(1, sizeof(struct sump_out)); sp->delimiter = (void *)"\n"; sp->rec_size = 0; if (link_in_nsort() != 0) /* if error */ { sp->error_code = SP_NSORT_LINK_FAILURE; sp->sort_state = SORT_DONE; return (sp->error_code); } sp->flags |= SP_SORT; if (def_fmt != NULL) { va_list ap; size_t def_len; va_start(ap, def_fmt); #if defined(win_nt) def_len = _vscprintf(def_fmt, ap); #else def_len = vsnprintf(NULL, 0, def_fmt, ap); #endif va_end(ap); def = (char *)calloc(def_len + 1, 1); va_start(ap, def_fmt); if (vsnprintf(def, def_len + 1, def_fmt, ap) != def_len) die("sp_start_sort: vnsprintf failed to return %d\n", def_len); va_end(ap); } else def = ""; def_len = strlen(def) + 1; /* plus '\0' */ def_len += strlen(STAT_DRCTV); def_len += strlen(thread_drctv); def_plus = (char *)malloc(def_len); if (def_plus == NULL) return (SP_MEM_ALLOC_ERROR); strcpy(def_plus, thread_drctv); /* goes first so caller can override */ strcat(def_plus, def); strcat(def_plus, STAT_DRCTV); /* check for input file declaration */ for (p = (unsigned char *)def_plus; *p != '\0'; p++) { if (p[0] == '-' || p[0] == '/') { p++; /* detect -IN[_]F[ILE] */ if (toupper(p[0]) == 'I' && toupper(p[1]) == 'N' && (toupper(p[2]) == 'F' || p[2] == '_')) { sp->in_file = ""; p += 3; } /* detect -OUT[_]F[ILE] */ else if (toupper(p[0]) == 'O' && toupper(p[1]) == 'U' && toupper(p[2]) == 'T' && (toupper(p[3]) == 'F' || p[3] == '_')) { sp->out[0].file = ""; p += 4; } /* detect -MATCH */ else if (toupper(p[0]) == 'M' && toupper(p[1]) == 'A' && toupper(p[2]) == 'T' && toupper(p[3]) == 'C' && toupper(p[4]) == 'H') { sp->match_keys = TRUE; p += 5; } } } /* check for output file declaration */ ret = (*Nsort_define)(def_plus, 0, NULL, &sp->nsort_ctx); free(def_plus); if (def_fmt != NULL) free(def); if (ret < 0) { char *msg = (*Nsort_message)(&sp->nsort_ctx); sp->sort_error = ret; sp->error_code = SP_SORT_DEF_ERROR; if (msg == NULL) msg = "No Nsort error message"; strncpy(sp->error_buf, msg, sp->error_buf_size - 1); sp->error_buf[sp->error_buf_size - 1] = '\0'; /* handle overflow */ sp->sort_state = SORT_DONE; return (SP_SORT_DEF_ERROR); } sp->sort_state = sp->in_file != NULL ? SORT_OUTPUT : SORT_INPUT; pthread_mutex_init(&sp->sump_mtx, NULL); /* use output ready cond for state changes */ pthread_cond_init(&sp->task_output_ready_cond, NULL); sp->sort_temp_buf_size = sp->out[0].buf_size ? sp->out[0].buf_size : 4096; if ((sp->sort_temp_buf = (char *)malloc(sp->sort_temp_buf_size)) == NULL) return (SP_MEM_ALLOC_ERROR); return (SP_OK); } /* sp_get_sort_stats - get a string containing the nsort statistics report * for an nsort sump pump that has completed. * * Returns: a string containing the Nsort statistics report. The string should * NOT be free()'d and is valid until the passed sp_t is sp_free()'d. */ const char *sp_get_sort_stats(sp_t sp) { const char *ret; if (sp->error_code) return ("no stats because of nsort error"); if (!(sp->flags & SP_SORT)) return (NULL); if ((ret = (*Nsort_get_stats)(&sp->nsort_ctx)) == NULL) ret = "Nsort_get_stats() failure\n"; return (ret); } /* sp_get_nsort_version - get the subversion version for sump pump * * Returns: a string containing the Nsort version number. The string should * NOT be free()'d. */ const char *sp_get_nsort_version(void) { if (link_in_nsort() != 0) /* if error */ { return ("Nsort could not be linked in"); } return ((*Nsort_version)()); } #else /* sp_start_sort - dummy non-sort version. */ int sp_start_sort(sp_t *caller_sp, char *def_fmt, ...) { *caller_sp = NULL; return (SP_SORT_NOT_COMPILED); } /* sp_get_sort_stats - dummy non-sort version. */ const char *sp_get_sort_stats(sp_t sp) { return (""); } /* sp_get_nsort_version - dummy non-sort version. */ const char *sp_get_nsort_version(void) { return ("this SUMP Pump version has not been compiled for nsort"); } #endif /* !defined(SUMP_PUMP_NO_SORT) */ /* pipe_reader - thread start function for thread that reads either the * stdout or stderr of an external process, writes the output * to either task output 0 or 1, respectively. */ void *pipe_reader(void *arg) { struct sump *sp; struct exec_state *ex; struct std_pipe *pipe; sp_task_t t; char *buf; int out_index; ssize_t len; ssize_t total = 0; ssize_t buf_size; ssize_t buf_bytes; int ret; pipe = (struct std_pipe *)arg; ex = pipe->ex; t = ex->t; sp = t->sp; #if defined(SUMP_PIPE_STDERR) /* determine if we are reading stdout or stderr */ if (pipe == &ex->err) { buf = ex->err_buf; out_index = 1; } else #endif { buf = ex->out_buf; out_index = 0; } TRACE("pipe_reader%d: started\n", t->thread_index); /* if we are just writing to a whole input buffer. */ if (sp->flags & SP_WHOLE_BUF) { /* get output buffer */ ret = pfunc_get_out_buf(t, out_index, (void **)&buf, (size_t *)&buf_size); if (ret != 0) { pfunc_error(t, "internal error: " "bad ret from sp_get_out_buf: %d\n", ret); goto reader_return; } /* read from pipe into output buffer, flushing buffer when necessary */ buf_bytes = 0; for (;;) { #if defined(win_nt) DWORD size; if (ReadFile(pipe->rd_h, buf + buf_bytes, buf_size - buf_bytes, &size, NULL)) len = size; /* success */ else if (GetLastError() == ERROR_BROKEN_PIPE) len = 0; /* eof */ else len = -1; /* failure */ #else len = read(pipe->rd_fd, buf + buf_bytes, buf_size - buf_bytes); #endif if (len == 0) break; if (len < 0) { pipe->perrno = ERRNO; buf_bytes = 0; break; } buf_bytes += len; if (buf_bytes == buf_size) { /* commit/flush output bytes */ ret = pfunc_put_out_buf_bytes(t, out_index, buf_bytes); if (ret != 0) { pfunc_error(t, "internal error: " "bad ret from sp_put_out_buf_bytes: %d\n",ret); goto reader_return; } buf_bytes = 0; } } if (buf_bytes != 0) { /* commit/flush output bytes */ ret = pfunc_put_out_buf_bytes(t, out_index, buf_bytes); if (ret != 0) { pfunc_error(t, "internal error: " "bad ret from sp_put_out_buf_bytes: %d\n",ret); goto reader_return; } } } else { /* read from pipe and write to task output */ for (;;) { #if defined(win_nt) DWORD size; if (ReadFile(pipe->rd_h, buf, PIPE_BUF_SIZE, &size, NULL)) len = size; /* success */ else if (GetLastError() == ERROR_BROKEN_PIPE) len = 0; /* eof */ else len = -1; /* failure */ #else len = read(pipe->rd_fd, buf, PIPE_BUF_SIZE); #endif TRACE("pipe_reader%d: read %d bytes\n", t->thread_index, (int)len); if (len == 0) break; if (len < 0) { pipe->perrno = ERRNO; break; } total += len; if (pfunc_write(t, out_index, buf, len) != len) { pfunc_error(t, "internal error: pfunc_write failure\n"); break; } } } reader_return: #if defined(win_nt) CloseHandle(pipe->rd_h); #else pfunc_mutex_lock(t); { close(pipe->rd_fd); pipe->rd_fd = INVALID_FD; } pfunc_mutex_unlock(t); #endif TRACE("pipe_reader%d: returning\n", t->thread_index); return (NULL); } /* pfunc_exec - internal pump function to pipe the task input to an external * process, and read back the process stdout and stderr and * write to task outputs 0 and 1 respectively. */ int pfunc_exec(sp_task_t t, void *unused) { char *rec; struct exec_state *ex; int ti = pfunc_get_thread_index(t); struct sump *sp; pthread_t out_thread; #if defined(SUMP_PIPE_STDERR) pthread_t err_thread; #endif int buf_bytes; pid_t child; int status; char errmsg[100]; /* just_input_files is a half-finished implementation using temporary files * rather than pipes to transport input data to the external program */ int just_input_files = 0; #if defined(win_nt) DWORD wr_size; PROCESS_INFORMATION pi; #else int tempfds[2]; #endif sp = t->sp; ex = &sp->ex_state[ti]; ex->t = t; ex->in.perrno = 0; ex->out.perrno = 0; #if defined(SUMP_PIPE_STDERR) ex->err.perrno = 0; #endif if (!just_input_files) { #if defined(win_nt) SECURITY_ATTRIBUTES sa; STARTUPINFOA sui; sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.bInheritHandle = TRUE; sa.lpSecurityDescriptor = NULL; /* create pipe for stdin of external process */ if (!CreatePipe(&ex->in.rd_h, &ex->in.wr_h, &sa, 0)) return (pfunc_error(t, "CreatePipe() error: %s\n", get_error_msg(0, errmsg, sizeof(errmsg)))); /* make sure the write handle is not inherited */ if (!SetHandleInformation(ex->in.wr_h, HANDLE_FLAG_INHERIT, 0)) return (pfunc_error(t, "SetHandleInformation() error: %s\n", get_error_msg(0, errmsg, sizeof(errmsg)))); /* create pipe for stdout of external process */ if (!CreatePipe(&ex->out.rd_h, &ex->out.wr_h, &sa, 0)) return (pfunc_error(t, "CreatePipe() error: %s\n", get_error_msg(0, errmsg, sizeof(errmsg)))); /* make sure the read handle is not inherited */ if (!SetHandleInformation(ex->out.rd_h, HANDLE_FLAG_INHERIT, 0)) return (pfunc_error(t, "SetHandleInformation() error: %s\n", get_error_msg(0, errmsg, sizeof(errmsg)))); ZeroMemory(&sui, sizeof(STARTUPINFOA)); sui.cb = sizeof(STARTUPINFO); sui.hStdInput = ex->in.rd_h; sui.hStdOutput = ex->out.wr_h; sui.hStdError = GetStdHandle(STD_ERROR_HANDLE); /* use own handle */ sui.dwFlags |= STARTF_USESTDHANDLES; ZeroMemory(&pi, sizeof(PROCESS_INFORMATION)); if (!CreateProcessA(NULL, /* NULL process name forces use of cmd line * where the PATH may be used to find exe */ sp->exec_argv[0],/* command line */ NULL, /* process security attributes */ NULL, /* thread security attributes */ TRUE, /* handles are inherited */ 0, /* flags */ NULL, /* environment */ NULL, /* current directory */ &sui, /* startup info */ &pi)) /* process info */ { return (pfunc_error(t, "CreateProcess() error: %s\n", get_error_msg(0, errmsg, sizeof(errmsg)))); } CloseHandle(pi.hThread); /* close the original write handle for external proc's stdout */ if (!CloseHandle(ex->out.wr_h)) { return (pfunc_error(t, "pipe_reader, CloseHandle failure: %d\n", GetLastError())); } #else /* lock out other pump functions (even in other sump pumps) * and their fd opens and closes */ pthread_mutex_lock(&Global_lock); if (Global_external_count == 0) Global_external_sp = sp; else if (Global_external_sp != sp) { /* there is more than one sump pump simultaneously active * invoking an external process. This code is not set up to * properly close the write fds (to the stdins of its external * processes) for the other sump pump. For now, declare error * and return. */ pthread_mutex_unlock(&Global_lock); return (pfunc_error(t, "more than one sump active with external processes\n")); } /* use curly braces to better illustrate mutex block */ { if (pipe(tempfds) != 0) { pthread_mutex_unlock(&Global_lock); return (pfunc_error(t, "pipe() error: %s\n", strerror(errno))); } ex->in.rd_fd = tempfds[0]; ex->in.wr_fd = tempfds[1]; if (pipe(tempfds) != 0) { pthread_mutex_unlock(&Global_lock); return (pfunc_error(t, "pipe() error: %s\n", strerror(errno))); } ex->out.rd_fd = tempfds[0]; ex->out.wr_fd = tempfds[1]; # if defined(SUMP_PIPE_STDERR) if (pipe(tempfds) != 0) { pthread_mutex_unlock(&Global_lock); return (pfunc_error(t, "pipe() error: %s\n", strerror(errno))); } ex->err.rd_fd = tempfds[0]; ex->err.wr_fd = tempfds[1]; # endif child = fork(); if (child == -1) { pthread_mutex_unlock(&Global_lock); return (pfunc_error(t, "fork() error: %s\n", strerror(errno))); } else if (child == 0) { /* current process is the newly created child */ int i; if (dup2(ex->in.rd_fd, STDIN_FILENO) == -1) { perror("dup2() for stdin"); exit(1); } close(ex->in.rd_fd); ex->in.rd_fd = INVALID_FD; if (dup2(ex->out.wr_fd, STDOUT_FILENO) == -1) { perror("dup2() for stdout"); exit(1); } close(ex->out.wr_fd); ex->out.wr_fd = INVALID_FD; # if defined(SUMP_PIPE_STDERR) if (dup2(ex->err.wr_fd, STDERR_FILENO) == -1) { perror("dup2() for stderr"); exit(1); } close(ex->err.wr_fd); ex->err.wr_fd = INVALID_FD; # endif /* close all sump-pump-end pipe file descriptors. * presumably closing these red */ for (i = 0; i < sp->num_threads; i++) { if (sp->ex_state[i].in.wr_fd != INVALID_FD) close(sp->ex_state[i].in.wr_fd); if (sp->ex_state[i].out.rd_fd != INVALID_FD) close(sp->ex_state[i].out.rd_fd); # if defined(SUMP_PIPE_STDERR) if (sp->ex_state[i].err.rd_fd != INVALID_FD) close(sp->ex_state[i].err.rd_fd); # endif } /* exec program */ execvp(sp->exec_argv[0], sp->exec_argv); /* if this path is reached, then the previous exec failed */ exit(errno); } /* this is the pfunc thread (and not the child process) */ /* close the child's ends of the 3 pipes */ close(ex->in.rd_fd); ex->in.rd_fd = INVALID_FD; close(ex->out.wr_fd); ex->out.wr_fd = INVALID_FD; # if defined(SUMP_PIPE_STDERR) close(ex->err.wr_fd); ex->err.wr_fd = INVALID_FD; # endif } Global_external_count++; pthread_mutex_unlock(&Global_lock); #endif /* create thread for each of stdout and stderr */ if (pthread_create(&out_thread, NULL, pipe_reader, &ex->out) != 0) return (pfunc_error(t, "pthread_create out error: %d\n", ERRNO)); #if defined(SUMP_PIPE_STDERR) if (pthread_create(&err_thread, NULL, pipe_reader, &ex->err) != 0) return (pfunc_error(t, "pthread_create err error: %d\n", ERRNO)); #endif } else { #if defined(win_nt) #else char fname[50]; sprintf(fname, "sptaskinput%llu", (long long unsigned int)t->task_number); ex->in.wr_fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0777); if (ex->in.wr_fd < 0) { strerror_r(errno, errmsg, sizeof(errmsg)); return(pfunc_error(t, "can't open debug input file '%s': %s\n", fname, errmsg)); } #endif } /* if we are just reading a whole input buffer, not record by record. */ if (sp->flags & SP_WHOLE_BUF) { unsigned char *in; size_t in_size; int ret; /* get input buffer */ if ((ret = pfunc_get_in_buf(t, (void **)&in, &in_size)) != 0) return (pfunc_error(t, "internal pfunc error: " "bad ret from sp_get_in_buf: %d\n", ret)); #if defined(win_nt) if (!WriteFile(ex->in.wr_h, in, in_size, &wr_size, NULL)) #else if (write(ex->in.wr_fd, in, in_size) != in_size) #endif ex->in.perrno = ERRNO; } else /* the records are lines of text */ { buf_bytes = 0; for (;;) { /* for each record in the group */ while (pfunc_get_rec(t, &rec) > 0) { int len; int copy_size; /* buffer and write to in.wr_fd */ len = strlen(rec); while (len) { copy_size = len; if (copy_size > PIPE_BUF_SIZE - buf_bytes) copy_size = PIPE_BUF_SIZE - buf_bytes; memcpy(ex->in_buf + buf_bytes, rec, copy_size); buf_bytes += copy_size; if (buf_bytes == PIPE_BUF_SIZE) { #if defined(win_nt) if (!WriteFile(ex->in.wr_h, ex->in_buf, buf_bytes, &wr_size, NULL)) #else if (write(ex->in.wr_fd, ex->in_buf, buf_bytes) != buf_bytes) #endif { ex->in.perrno = ERRNO; buf_bytes = 0; break; } buf_bytes = 0; rec += copy_size; } len -= copy_size; } } /* if we are doing a "group by" and we have not finished all the * records in the first input buffer, then reset the eof state to * continue reading records from the first input buffer. */ if ((sp->flags & SP_GROUP_BY) && t->first_in_buf) { t->input_eof = FALSE; t->first_group_rec = TRUE; } else break; /* done with first input buffer and maybe more */ } /* flush any remainder */ if (buf_bytes != 0) { #if defined(win_nt) if (!WriteFile(ex->in.wr_h, ex->in_buf, buf_bytes, &wr_size, NULL)) #else if (write(ex->in.wr_fd, ex->in_buf, buf_bytes) != buf_bytes) #endif ex->in.perrno = ERRNO; } } #if defined(win_nt) CloseHandle(ex->in.wr_h); #else pthread_mutex_lock(&Global_lock); close(ex->in.wr_fd); ex->in.wr_fd = INVALID_FD; Global_external_count--; pthread_mutex_unlock(&Global_lock); #endif if (!just_input_files) { #if defined(win_nt) DWORD exitCode; DWORD rv; /* wait for external process */ rv = WaitForSingleObject(pi.hProcess, INFINITE); if (rv != WAIT_OBJECT_0) return (pfunc_error(t, "process WaitForSingleObject() ret: %d, error: %s\n", rv, get_error_msg(0, errmsg, sizeof(errmsg)))); if (!GetExitCodeProcess(pi.hProcess, &exitCode)) return (pfunc_error(t, "GetExitCodeProcess() error: %s\n", get_error_msg(0, errmsg, sizeof(errmsg)))); TRACE("pfunc_exec%d: ext proc exited with: %d\n", t->thread_index, exitCode); if (exitCode != 0) { pfunc_error(t, "process '%s' instance %I64u exited with status: 0x%x\n", sp->exec_argv[0], t->task_number, exitCode); } CloseHandle(pi.hProcess); #else /* if error writing to pipe and is not broken pipe error */ if (ex->in.perrno != 0 && ex->in.perrno != EPIPE) { strerror_r(ex->in.perrno, errmsg, sizeof(errmsg)); TRACE("pfunc_exec%d: stdin write error: %d, %s\n", t->thread_index, ex->in.perrno, errmsg); return (pfunc_error(t, "pipe stdin write error: %s\n", errmsg)); } /* wait for child process to end */ if (waitpid(child, &status, 0) == -1) { strerror_r(errno, errmsg, sizeof(errmsg)); TRACE("pfunc_exec%d: child wait error: %d, %s\n", t->thread_index, errno, errmsg); return (pfunc_error(t, "pipe process wait error: %s\n", errmsg)); } if (WIFSIGNALED(status)) { TRACE("pfunc_exec%d: child terminated with signal %d\n", t->thread_index, WTERMSIG(status)); return (pfunc_error(t, "external process terminated with signal %d\n", WTERMSIG(status))); } if (WIFSTOPPED(status)) { TRACE("pfunc_exec%d: child stopped with signal %d\n", t->thread_index, WSTOPSIG(status)); return (pfunc_error(t, "external process stopped with signal %d\n", WSTOPSIG(status))); } if (WIFEXITED(status) && WEXITSTATUS(status) != 0) { TRACE("pfunc_exec%d: child exited with status: %d\n", t->thread_index, WEXITSTATUS(status)); pfunc_error(t, "process '%s' instance %llu exited with status: %d\n", sp->exec_argv[0], t->task_number, WEXITSTATUS(status)); } #endif /* wait for stdout thread to end */ pthread_join(out_thread, NULL); TRACE("pfunc_exec%d: out thread exited with errno: %d\n", t->thread_index, ex->out.perrno); if (ex->out.perrno != 0) { get_error_msg(ex->out.perrno, errmsg, sizeof(errmsg)); TRACE("pfunc_exec%d: out thread exited on error: %d, %s\n", t->thread_index, ex->out.perrno, errmsg); pfunc_error(t, "pipe stdout read error: %s\n", errmsg); } #if defined(SUMP_PIPE_STDERR) /* wait for stderr thread to end */ pthread_join(err_thread, NULL); TRACE("pfunc_exec%d: error thread exited with errno: %d\n", t->thread_index, ex->err.perrno); if (ex->err.perrno != 0) { get_error_msg(ex->err.perrno, errmsg, sizeof(errmsg)); TRACE("pfunc_exec%d: error thread exited on error: %d, %s\n", t->thread_index, ex->err.perrno, errmsg); pfunc_error(t, "pipe stderr read error: %s\n", errmsg); } #endif } return (t->error_code); } /* file_reader_test - test routine for a file reader thread using * normal read() calls into an intermediate * buffer followed by writes into the sump pump. */ static void *file_reader_test(void *arg) { int size; char *read_buf; sp_file_t sp_src = (sp_file_t)arg; sp_t sp = sp_src->sp; char err_buf[200]; TRACE("file_reader_intr: allocating %d buffer bytes\n", sp_src->transfer_size); read_buf = (char *)malloc(sp_src->transfer_size); if (read_buf == NULL) { sp_raise_error(sp, SP_MEM_ALLOC_ERROR, "%s: buffer malloc failure, size %d\n", sp_src->fname, (int)sp_src->transfer_size); } /* keep looping until there is no additional input */ for ( ; read_buf != NULL; ) { #if defined(win_nt) DWORD rlen; if (ReadFile(sp_src->fd, read_buf, sp_src->transfer_size, &rlen, NULL)) size = rlen; /* success */ else if (GetLastError() == ERROR_BROKEN_PIPE) size = 0; /* eof */ else size = -1; /* failure */ #else size = read(sp_src->fd, read_buf, (unsigned int)sp_src->transfer_size); #endif if (size < 0) { sp_raise_error(sp, SP_FILE_READ_ERROR, "%s: read() failure: %s\n", sp_src->fname, get_error_msg(0, err_buf, sizeof(err_buf))); break; } if (sp_write_input(sp, read_buf, size) != size) break; /* silently quit on a downstream error */ if (size == 0) /* if we just sent 0 bytes to sp_write_input() */ break; } if (read_buf != NULL) free(read_buf); TRACE("file_reader_test done: %d\n", sp_src->error_code); return (NULL); } /* file_reader_buffered - main routine for a file reader thread using normal * read() calls directly into sump pump input buffers. */ static void *file_reader_buffered(void *arg) { size_t size; size_t buf_size; size_t filled_bytes; size_t request; uint64_t index; char *read_buf; int eof; int ret; sp_file_t sp_src = (sp_file_t)arg; sp_t sp = sp_src->sp; char err_buf[200]; #if defined(win_nt) DWORD rlen; #endif TRACE("file_reader_buffered starting\n"); /* keep looping until there is no additional input */ for (index = 0; ; index++) { if (sp_get_in_buf(sp, index, (void **)&read_buf, &buf_size) != SP_OK) break; for (filled_bytes = 0; filled_bytes < buf_size; filled_bytes += size) { /* calculate read request size */ request = buf_size - filled_bytes; /* limit request size to 2GB */ if (request > 0x80000000) request = 0x80000000; if (sp_src->transfer_size != 0 && request > sp_src->transfer_size) request = sp_src->transfer_size; #if defined(win_nt) if (ReadFile(sp_src->fd, read_buf + filled_bytes, (DWORD)request, &rlen, NULL)) size = rlen; /* success */ else if (GetLastError() == ERROR_BROKEN_PIPE) size = 0; /* eof */ else size = -1; /* failure */ #else size = read(sp_src->fd, read_buf + filled_bytes, (unsigned int)request); #endif TRACE("file_reader: read %d bytes\n", (int)size); if (size < 0) { sp_raise_error(sp, SP_FILE_READ_ERROR, "%s: read() failure: %s\n", sp_src->fname, get_error_msg(0, err_buf, sizeof(err_buf))); filled_bytes = 0; break; } if (size == 0) /* if EOF */ break; } eof = (filled_bytes < request); if ((ret = sp_put_in_buf_bytes(sp, index, filled_bytes, eof)) != SP_OK) { TRACE("file_reader: sp_put_in_buf_bytes ret: %d\n", ret); break; /* silently quit on a downstream error */ } if (eof) break; } #if defined(win_nt) CloseHandle(sp_src->fd); sp_src->fd = INVALID_HANDLE_VALUE; #else /*close(sp_src->fd);*/ sp_src->fd = INVALID_FD; #endif TRACE("file_reader_buffered done: %d\n", sp_src->error_code); return (NULL); } /* file_writer_buffered - main routine for a file writer thread using normal * write() calls. */ static void *file_writer_buffered(void *arg) { char *buf; ssize_t size; sp_file_t sp_dst = (sp_file_t)arg; sp_t sp = sp_dst->sp; int64_t file_size = 0; #if defined(win_nt) DWORD wr_size; #else int fd = sp_dst->fd; #endif int out_index = sp_dst->out_index; char err_buf[200]; TRACE("file_writer_buffered: allocating %d buffer bytes\n", sp_dst->transfer_size); buf = (char *)malloc(sp_dst->transfer_size); if (buf == NULL) { sp_raise_error(sp, SP_MEM_ALLOC_ERROR, "%s: buffer malloc failure, size %d\n", sp_dst->fname, (int)sp_dst->transfer_size); } for ( ; buf != NULL; ) { size = sp_read_output(sp, out_index, buf, sp_dst->transfer_size); if (size <= 0) { if (size < 0) { sp_dst->error_code = SP_FILE_WRITE_ERROR; } break; } TRACE("file_writer: writing %d bytes at offset %"PTFlld"\n", size, file_size); #if defined(win_nt) if (!WriteFile(sp_dst->fd, buf, size, &wr_size, NULL)) #else if (write(fd, buf, (unsigned int)size) != size) #endif { sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: write() failure: %s\n", sp_dst->fname, get_error_msg(0, err_buf, sizeof(err_buf))); sp_dst->error_code = SP_FILE_WRITE_ERROR; TRACE("output file write error: %s\n", err_buf); break; } file_size += size; } if (buf != NULL && sp_dst->can_seek) { #if defined(win_nt) if (!SetEndOfFile(sp_dst->fd)) #else if (ftruncate(sp_dst->fd, file_size)) #endif { sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: truncate failure: %s\n", sp_dst->fname, get_error_msg(0, err_buf, sizeof(err_buf))); sp_dst->error_code = SP_FILE_WRITE_ERROR; TRACE("truncate error: %s\n", err_buf); } } #if defined(win_nt) CloseHandle(sp_dst->fd); sp_dst->fd = INVALID_HANDLE_VALUE; #else /*close(sp_dst->fd);*/ sp_dst->fd = INVALID_FD; #endif if (buf != NULL) free(buf); TRACE("file_writer_buffered done: %d\n", sp_dst->error_code); return (NULL); } #if defined(AIO_CAPABLE) /* file_reader_direct - main routine for a file reader thread using direct * aio_read() calls on sump pump input buffers. */ static void *file_reader_direct(void *arg) { ssize_t size; ssize_t request; size_t in_buf_size; uint64_t aios_started; int64_t file_read_offset = 0; struct sump_aio *spaio; struct aiocb *aio; const struct aiocb *cb[1]; int eof; sp_file_t sp_src = (sp_file_t)arg; sp_t sp = sp_src->sp; char err_buf[200]; int aio_count; int start; int done; uint64_t next_in_buf; size_t next_buf_offset; char *buf; int put_result; #if defined(win_nt) int i; #endif if (sp_src->aio_count <= 0) aio_count = 2; else aio_count = sp_src->aio_count; TRACE("file_reader_direct allocating %d aio structs\n", aio_count); spaio = (struct sump_aio *)calloc(sizeof(struct sump_aio), aio_count); if (spaio == NULL) { sp_raise_error(sp, SP_MEM_ALLOC_ERROR, "%s: aio malloc failure, size %d\n", sp_src->fname, (int)aio_count); TRACE("file_reader_direct done: %d\n", sp_src->error_code); return (NULL); } #if defined(win_nt) for (i = 0; i < aio_count; i++) { spaio[i].aio.sump_over.hEvent = CreateEvent(NULL, 1, 0, NULL); if (spaio[i].aio.sump_over.hEvent == NULL) die("file_reader_direct, CreateEvent failed\n"); } #endif /* keep looping until there is no additional input */ next_in_buf = 0; next_buf_offset = 0; for (aios_started = 0; ; aios_started++) { start = aios_started % aio_count; aio = &spaio[start].aio; aio->aio_fildes = sp_src->fd; /* if getting of the buffer fails. */ if (sp_get_in_buf(sp, next_in_buf, (void **)&buf, &in_buf_size) != SP_OK) { sp_raise_error(sp, SP_FILE_READ_ERROR, "sp_get_in_buf() failure with in_buf %lld\n", aios_started); break; } request = in_buf_size - next_buf_offset; if (request > sp_src->transfer_size) request = sp_src->transfer_size; aio->aio_buf = buf + next_buf_offset; aio->aio_nbytes = request; aio->aio_offset = file_read_offset; spaio[start].buf_index = next_in_buf; spaio[start].buf_offset = next_buf_offset; spaio[start].file_offset = file_read_offset; spaio[start].nbytes = request; spaio[start].last_buf_io = (next_buf_offset + request == in_buf_size); next_buf_offset += request; TRACE("file_reader: reading %d bytes at offset %"PTFlld"\n", request, file_read_offset); if (next_buf_offset == in_buf_size) { next_in_buf++; next_buf_offset = 0; } if (aio_read(aio) < 0) { get_error_msg(aio_error(aio), err_buf, sizeof(err_buf)); TRACE("file_reader_direct: read failed: %s\n", err_buf); sp_src->error_code = SP_FILE_READ_ERROR; sp_raise_error(sp, SP_FILE_READ_ERROR, "%s: aio_read() failure: %s, " "offset: %"PTFlld", size: %"PTFlld"\n", sp_src->fname, err_buf, aio->aio_offset, (int64_t)request); break; } file_read_offset += request; /* if the max number of aios has not been started, then start another. */ if (aios_started < aio_count - 1) continue; /* wait for a previously issued request. */ done = (aios_started + 1) % aio_count; aio = &spaio[done].aio; request = spaio[done].nbytes; cb[0] = aio; if (aio_suspend(cb, 1, NULL) != 0) { sp_src->error_code = SP_FILE_READ_ERROR; sp_raise_error(sp, SP_FILE_READ_ERROR, "%s: aio_suspend() failure: %s, " "offset: %"PTFlld", size: %"PTFlld"\n", sp_src->fname, get_error_msg(0, err_buf, sizeof(err_buf)), spaio[done].file_offset, (int64_t)request); break; } if ((size = aio_return(aio)) < 0) { sp_src->error_code = SP_FILE_READ_ERROR; sp_raise_error(sp, SP_FILE_READ_ERROR, "%s: aio_return() failure: %s, " "offset: %"PTFlld", size: %"PTFlld"\n", sp_src->fname, get_error_msg(aio_error(aio), err_buf, sizeof(err_buf)), spaio[done].file_offset, (int64_t)request); break; } eof = (size < request); /* "put" the input buffer bytes if some bytes have been read into * the buffer and either eof or that was the last read for the buffer */ size += spaio[done].buf_offset; put_result = SP_OK; if (size && (eof || spaio[done].last_buf_io)) { put_result = sp_put_in_buf_bytes(sp, spaio[done].buf_index, size, eof); } if (put_result != SP_OK || eof) { /* clean up remaining aios and finish up */ if (aio_count != 1) { /* wait for and ignore all previously issued aio'ess */ do { done = (done + 1) % aio_count; cb[0] = &spaio[done].aio; aio_suspend(cb, 1, NULL); } while (done != start); } break; } } #if defined(win_nt) for (i = 0; i < aio_count; i++) CloseHandle(spaio[i].aio.sump_over.hEvent); CloseHandle(sp_src->fd); sp_src->fd = INVALID_HANDLE_VALUE; #else /*close(sp_src->fd);*/ sp_src->fd = INVALID_FD; #endif free(spaio); TRACE("file_reader_direct done: %d\n", sp_src->error_code); return (NULL); } /* file_writer_direct - main routine for a file writer thread using direct * aio_write() calls. */ static void *file_writer_direct(void *arg) { char *buf; ssize_t size; ssize_t alloc_size; sp_file_t sp_dst = (sp_file_t)arg; sp_t sp = sp_dst->sp; int out_index = sp_dst->out_index; ssize_t request; uint64_t aios_started; uint64_t aios_completed; int64_t file_write_offset = 0; struct sump_aio *spaio; struct aiocb *aio; const struct aiocb *cb[1]; int aio_count; char err_buf[200]; int start; int done; int eof; int ret; char *remainder_start = NULL; int64_t remainder_size = 0; #if defined(win_nt) LONG high; LONG low; int i; #endif if (sp_dst->aio_count <= 0) aio_count = 2; else aio_count = sp_dst->aio_count; spaio = (struct sump_aio *)calloc(sizeof(struct sump_aio), aio_count); if (spaio == NULL) { sp_raise_error(sp, SP_MEM_ALLOC_ERROR, "%s: aio malloc failure, size %d\n", sp_dst->fname, (int)aio_count); TRACE("file_writer_direct done: %d\n", sp_dst->error_code); return (NULL); } TRACE("file_writer_direct[%d]: allocating %d buffer bytes\n", out_index, sp_dst->transfer_size); alloc_size = sp_dst->transfer_size * aio_count; #if defined(win_nt) buf = VirtualAlloc(NULL, alloc_size, MEM_COMMIT, PAGE_READWRITE); if (buf == NULL) { sp_raise_error(sp, SP_MEM_ALLOC_ERROR, "VirtualAlloc() failure: %s\n", get_error_msg(0, err_buf, sizeof(err_buf))); sp_dst->error_code = SP_MEM_ALLOC_ERROR; return NULL; } for (i = 0; i < aio_count; i++) { spaio[i].aio.sump_over.hEvent = CreateEvent(NULL, 1, 0, NULL); if (spaio[i].aio.sump_over.hEvent == NULL) die("file_writer_direct, CreateEvent failed\n"); } #else init_zero_fd(); buf = mmap(NULL, alloc_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, Zero_fd, 0); if (buf == MAP_FAILED) { sp_raise_error(sp, SP_MEM_ALLOC_ERROR, "mmap() failure: %s\n", strerror_r(errno, err_buf, sizeof(err_buf))); sp_dst->error_code = SP_MEM_ALLOC_ERROR; return NULL; } #endif aios_completed = 0; for (aios_started = 0; ; ) { start = aios_started % aio_count; aio = &spaio[start].aio; aio->aio_fildes = sp_dst->fd; aio->aio_buf = buf + start * sp_dst->transfer_size; request = sp_read_output(sp, out_index, (void *)aio->aio_buf, sp_dst->transfer_size); if (request < 0) { sp_dst->error_code = SP_FILE_WRITE_ERROR; break; } /* if not a write or not a full write, then its eof. */ eof = (request < sp_dst->transfer_size); if (eof && (remainder_size = request % PAGE_SIZE) != 0) { request -= remainder_size; remainder_start = (char *)aio->aio_buf + request; } if (request != 0) { TRACE("file_writer[%d]: writing %"PTFlld" bytes at offset %"PTFlld"\n", out_index, (int64_t)request, file_write_offset); aio->aio_nbytes = request; aio->aio_offset = file_write_offset; spaio[start].buf_index = 0; /* not used */ spaio[start].buf_offset = 0; /* not used */ spaio[start].file_offset = file_write_offset; spaio[start].nbytes = request; spaio[start].last_buf_io = 0; /* not used */ if (aio_write(aio) < 0) { sp_dst->error_code = SP_FILE_WRITE_ERROR; sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: aio_write() failure: %s, " "offset: %lld, size: %lld\n", sp_dst->fname, get_error_msg(aio_error(&aio[start]), err_buf, sizeof(err_buf)), spaio[done].file_offset, request); break; } file_write_offset += request; aios_started++; } else if (aios_started == 0) /* if no direct output whatsoever */ break; /* if not eof and the max number of aios has not been started, * then start another. */ if (!eof && aios_started < aio_count) continue; /* wait for a previously issued request. * if eof, then wait for all previously issued requests. */ do { done = aios_completed % aio_count; aio = &spaio[done].aio; request = spaio[done].nbytes; cb[0] = aio; #if defined(win_nt) ret = aio_suspend(cb, 1, NULL); #else while ((ret = aio_suspend(cb, 1, NULL)) != 0 && errno == EINTR) continue; #endif if (ret != 0) { sp_dst->error_code = SP_FILE_WRITE_ERROR; sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: aio_suspend() failure: %s, " "offset: %lld, size: %lld\n", sp_dst->fname, get_error_msg(0, err_buf, sizeof(err_buf)), spaio[done].file_offset, request); break; } if ((size = aio_return(aio)) < 0) { sp_dst->error_code = SP_FILE_WRITE_ERROR; sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: aio_return() failure: %s, " "offset: %lld, size: %lld\n", sp_dst->fname, get_error_msg(aio_error(aio), err_buf, sizeof(err_buf)), spaio[done].file_offset, request); break; } if (size != request) { sp_dst->error_code = SP_FILE_WRITE_ERROR; sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: aio_write() return failure: %s, " "offset: %"PTFlld", " "returned size %"PTFlld " != requested size: %"PTFlld"\n", sp_dst->fname, get_error_msg(aio_error(aio), err_buf, sizeof(err_buf)), spaio[done].file_offset, (int64_t)size, (int64_t)request); break; } aios_completed++; } while (eof && aios_completed < aios_started); if (eof || sp_dst->error_code != 0) break; } if (sp_dst->error_code == 0 && remainder_size != 0) { TRACE("file_writer[%d]: writing remaining %"PTFlld" bytes at offset %"PTFlld"\n", out_index, (int64_t)remainder_size, file_write_offset); #if defined(win_nt) CloseHandle(sp_dst->fd); sp_dst->fd = CreateFile(sp_dst->fname, GENERIC_WRITE, FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); if (sp_dst->fd == INVALID_HANDLE_VALUE) { sp_dst->error_code = SP_FILE_WRITE_ERROR; sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: remainder CreateFile() return failure: %s\n", sp_dst->fname, get_error_msg(0, err_buf, sizeof(err_buf))); } else { int err, wr_ret; aio = &spaio[0].aio; aio->sump_over.Offset = (int)file_write_offset; aio->sump_over.OffsetHigh = (int)(file_write_offset >> 32); ResetEvent(aio->sump_over.hEvent); wr_ret = WriteFile(sp_dst->fd, remainder_start, remainder_size, NULL, &aio->sump_over); err = GetLastError(); if (wr_ret == 0 && err != ERROR_IO_PENDING) ret = -1; else { wr_ret = GetOverlappedResult(sp_dst->fd, &aio->sump_over, &ret, TRUE); if (wr_ret == 0) ret = -1; } if (ret != remainder_size) { sp_dst->error_code = SP_FILE_WRITE_ERROR; sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: pwrite() return failure: %s, " "offset: %I64d, " "returned size %d != requested size: %I64d\n", sp_dst->fname, get_error_msg(0, err_buf, sizeof(err_buf)), file_write_offset, ret, remainder_size); } else file_write_offset += remainder_size; } #else /* close file descriptor that was opened with O_DIRECT */ close(sp_dst->fd); /* reopen file without O_DIRECT */ if ((sp_dst->fd = open(sp_dst->fname, O_WRONLY, 0777)) < 0) { sp_dst->error_code = SP_FILE_WRITE_ERROR; sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: remainder open() return failure: %s\n", sp_dst->fname, strerror_r(errno, err_buf, sizeof(err_buf))); } else { ret = pwrite(sp_dst->fd, remainder_start, remainder_size, file_write_offset); if (ret != remainder_size) { sp_dst->error_code = SP_FILE_WRITE_ERROR; sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: pwrite() return failure: %s, " "offset: %lld, " "returned size %d != requested size: %lld\n", sp_dst->fname, strerror_r(errno, err_buf, sizeof(err_buf)), file_write_offset, ret, remainder_size); } else file_write_offset += remainder_size; } #endif } #if defined(win_nt) /* truncate output file, since we didn't truncate it when we opened it. */ high = (LONG)(file_write_offset >> 32); low = (LONG)(file_write_offset & 0xFFFFFFFF); ret = SetFilePointer(sp_dst->fd, low, &high, FILE_BEGIN); if (ret == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) { sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: SetFilePointer() failure: %s\n", sp_dst->fname, get_error_msg(0, err_buf, sizeof(err_buf))); sp_dst->error_code = SP_FILE_WRITE_ERROR; TRACE("file_writer_direct[%d]: SetFilePointer() error: %s\n", out_index, err_buf); } else if (!SetEndOfFile(sp_dst->fd)) { sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: SetEndOfFile() failure: %s\n", sp_dst->fname, get_error_msg(0, err_buf, sizeof(err_buf))); sp_dst->error_code = SP_FILE_WRITE_ERROR; TRACE("file_writer_direct[%d]: SetEndOfFile() error: %s\n", out_index, err_buf); } else TRACE("file_writer_direct[%d]: truncated file to %I64d bytes\n", out_index, file_write_offset); VirtualFree(buf, alloc_size, MEM_RELEASE); for (i = 0; i < aio_count; i++) CloseHandle(spaio[i].aio.sump_over.hEvent); CloseHandle(sp_dst->fd); sp_dst->fd = INVALID_HANDLE_VALUE; #else if (ftruncate(sp_dst->fd, file_write_offset)) { sp_raise_error(sp, SP_FILE_WRITE_ERROR, "%s: ftruncate() failure: %s\n", sp_dst->fname, get_error_msg(0, err_buf, sizeof(err_buf))); sp_dst->error_code = SP_FILE_WRITE_ERROR; TRACE("file_writer_direct[%d]: ftruncate() error: %s\n", out_index, err_buf); } munmap(buf, alloc_size); /*close(sp_dst->fd);*/ sp_dst->fd = INVALID_FD; #endif free(spaio); TRACE("file_writer_direct[%d] done: %d\n", out_index, sp_dst->error_code); return NULL; } #endif /* sp_wait - can be called by an external thread, e.g. the thread that * called sp_start(), to wait for all sump pump activity to cease. * * Returns: SP_OK or a sump pump error code */ int sp_wait(sp_t sp) { unsigned i; int ret; /* if already waited for this sump pump to complete */ if (sp->wait_done == TRUE) return (sp->error_code); if (sp->flags & SP_SORT) { #if !defined(SUMP_PUMP_NO_SORT) /* if the sort output is going to a file, and we haven't yet * waited for it to complete. */ if (sp->sort_state == SORT_OUTPUT && sp->out[0].file != NULL) { size_t zero = 0; ret = (*Nsort_return_recs)(NULL, &zero, &sp->nsort_ctx); if (ret != NSORT_END_OF_OUTPUT) post_nsort_error(sp, ret); sp->sort_state = SORT_DONE; } #endif pthread_mutex_lock(&sp->sump_mtx); while (sp->error_code == 0 && sp->sort_state != SORT_DONE) { TRACE("sp_wait() condition wait"); pthread_cond_wait(&sp->task_output_ready_cond, &sp->sump_mtx); } pthread_mutex_unlock(&sp->sump_mtx); return (sp->error_code); } else /* normal (non-sort) sump pump */ { TRACE("waiting for input file reader\n"); if (sp->in_file_sp != NULL) { if ((ret = sp_file_wait(sp->in_file_sp)) != SP_OK) return (sp->error_code == SP_OK ? ret : sp->error_code); } for (i = 0; i < sp->num_threads; i++) { TRACE("waiting for pump thread %d\n", i); pthread_join(sp->thread[i], NULL); } for (i = 0; i < sp->num_outputs; i++) { if (sp->out[i].file_sp != NULL) { TRACE("waiting for output %d thread\n", i); if ((ret = sp_file_wait(sp->out[i].file_sp)) != SP_OK) return (sp->error_code == SP_OK ? ret : sp->error_code); } } } sp->wait_done = TRUE; return (sp->error_code); } /* syntax error - internal routine to internally record a syntax error * message for possible later retrieval. */ static void syntax_error(sp_t sp, char *p, char *err_msg) { int ret; const char *fmt = "syntax error: %s at: \"%s\"\n"; if (sp->error_code != 0) /* if prior error */ return; /* ignore this one */ sp->error_code = SP_SYNTAX_ERROR; #if defined(win_nt) ret = _snprintf(sp->error_buf, sp->error_buf_size, fmt, err_msg, p); if (ret == -1) ret = _scprintf(fmt, err_msg, p); #else ret = snprintf(sp->error_buf, sp->error_buf_size, fmt, err_msg, p); #endif if ((size_t)ret >= sp->error_buf_size) { if (sp->error_buf_size != 0) free(sp->error_buf); sp->error_buf_size = (size_t)ret + 1; sp->error_buf = (char *)malloc(sp->error_buf_size); #if defined(win_nt) _snprintf(sp->error_buf, sp->error_buf_size, fmt, err_msg, p); #else snprintf(sp->error_buf, sp->error_buf_size, fmt, err_msg, p); #endif } return; } /* get_numeric_arg - internal routine to convert an ascii number to int64_t */ static int64_t get_numeric_arg(sp_t sp, char **caller_p) { int negative = 0; int64_t result = 0; char *p = *caller_p; if (*p == '-') { p++; negative = 1; } if (*p < '0' || *p > '9') { syntax_error(sp, p, "expected numeric argument"); return 0; } while (*p >= '0' && *p <= '9') { result = result * 10 + (*p - '0'); p++; } *caller_p = p; return (negative ? -result : result); } /* get_scale - internal routine to convert 'k', 'm' of 'g' to a numeric value */ static int64_t get_scale(char **caller_p) { char *p = *caller_p; int64_t factor = 1; if (*p == 'k' || *p == 'K') factor = 1024; else if (*p == 'm' || *p == 'M') factor = 1024 * 1024; else if (*p == 'g' || *p == 'G') factor = 1024 * 1024 * 1024; else return (1); *caller_p = p + 1; return (factor); } /* scan - internal routine to scan the sp_start() string for a keyword. */ static int scan(char *kw, char **dp) { unsigned char *kp; unsigned char *p; kp = (unsigned char *)kw; p = *(unsigned char **)dp; /* while we haven't hit the end of the keyword or the end of the * definition string, and * there is a either a match in the current characters or the keyword * string contains an '_' and if we skip over it the chars match. */ while (*kp != '\0' && *p != '\0' && (toupper(*kp) == toupper(*p) || (*kp == '_' && (kp++, toupper(*kp) == toupper(*p))))) { /* go on to next characters in each string */ kp++; p++; } /* there was a match if we got to the end of the keyword string and * either the last character of the keyword was '=' or the ending * definition string character is not alphabetic nor underscore */ if (*kp == '\0' && (*(kp - 1) == '=' || !((toupper(*p) >= 'A' && toupper(*p) <= 'Z') || *p == '_'))) { /* update definition pointer */ *dp = (char *)p; return TRUE; } else return FALSE; } /* get_file_mods - internal routine to get file name modifiers, * e.g. access mode and transfer size. */ static void get_file_mods(sp_file_t spf, char *mods) { char *p = mods; for (;;) { while (*p == ',') p++; if (*p == '\0') break; else if (scan("BUFFERED", &p) || scan("BUF", &p)) { spf->mode = MODE_BUFFERED; } else if (scan("DIRECT", &p) || scan("DIR", &p)) { spf->mode = MODE_DIRECT; } else if (scan("COUNT", &p) || scan("CO", &p)) { if (*p != ':' && *p != '=') { syntax_error(spf->sp, p, "expected ':' or '=' after 'count'"); return; } p++; spf->aio_count = (int)get_numeric_arg(spf->sp, &p); } else if (scan("TRANSFER", &p) || scan("TRANS", &p) || scan("TR", &p)) { if (*p != ':' && *p != '=') { syntax_error(spf->sp, p, "expected ':' of '='"); return; } p++; spf->transfer_size = (size_t)get_numeric_arg(spf->sp, &p); spf->transfer_size *= (size_t)get_scale(&p); } else { syntax_error(spf->sp, p, "unrecognized file modifier"); break; } } } /* sp_open_file_src - use the specified file as the input for the * specified sump pump. * * Parameters: * sp - sp_t identifier for which we are opening the input file. * fname_mods - Name of the file, potentially followed by one or more * of the following modifiers (with no intervening spaces): * ,BUFFERED or ,BUF The file will be read with normal * buffered (not direct) reads. * ,DIRECT or ,DIR The file will be read with direct * and asynchronous reads. * ,TRANSFER=%d{k,m,g} or ,TRANS=%d{k,m,g} or ,TR=%d{k,m,g} * The transfer size (read request size) * is specified in kilo, mega or giga * bytes. * ,COUNT=%d or ,CO=%d The count of the maximum number of * outstanding asynchronous read requests * is given. * Example: * myfilename,dir,trans=4m,co=4 * The above example specifies a file * name of "myfilename", with direct * and asynchronous reads, with a * request size of 4 MB, and a maximum * of 4 outstanding asynchronous read * requests at any time. * * Returns: NULL if an error occurs in opening the file, otherwise * a valid sump pump file structure. */ sp_file_t sp_open_file_src(sp_t sp, const char *fname_mods) { sp_file_t sp_src; char *comma_char; int fname_len; void *(*reader_main)(void *); int is_stdin; int specified_mode; if ((sp_src = (sp_file_t)calloc(1, sizeof(struct sp_file))) == NULL) return (NULL); sp_src->sp = sp; comma_char = strchr(fname_mods, ','); fname_len = (int) (comma_char == NULL ? strlen(fname_mods) : comma_char - fname_mods); sp_src->fname = (char *)calloc(1, fname_len + 1); memcpy(sp_src->fname, fname_mods, fname_len); sp_src->fname[fname_len] = '\0'; if (comma_char != NULL) get_file_mods(sp_src, comma_char + 1); specified_mode = sp_src->mode; is_stdin = (strcmp(sp_src->fname, "") == 0); #if defined(win_nt) if (is_stdin) { sp_src->fd = GetStdHandle(STD_INPUT_HANDLE); if (sp_src->transfer_size == 0) /* there is a read size limit for keyboard input, but is this it?*/ sp_src->transfer_size = 8192; sp_src->mode = MODE_BUFFERED; } else { sp_src->fd = CreateFile(sp_src->fname, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); if (sp_src->fd == INVALID_HANDLE_VALUE) return (NULL); sp_src->can_seek = (GetFileType(sp_src->fd) == FILE_TYPE_DISK); } #else if (is_stdin) { sp_src->fd = 0; sp_src->mode = MODE_BUFFERED; } else { struct stat buf; sp_src->fd = open(sp_src->fname, 0); if (sp_src->fd < 0) return (NULL); if (fstat(sp_src->fd, &buf) != 0) return (NULL); sp_src->can_seek = S_ISREG(buf.st_mode); } #endif /* if direct mode was specified, but it is not a file */ if (specified_mode == MODE_DIRECT && !sp_src->can_seek) { start_error(sp, "direct mode reads were requested for file %s, but it" " is either not a normal file or stdin\n", sp_src->fname); return (NULL); } if (sp_src->mode == MODE_UNSPECIFIED) sp_src->mode = sp_src->can_seek ? Default_file_mode : MODE_BUFFERED; /* if file mode is direct (whether by specification or default) */ if (sp_src->mode == MODE_DIRECT) { /* if a transfer size has been specified that is not a multiple of the * page size, then silently revert to buffered mode. * We probably should instead stay with direct mode, allocate separate * read buffers, and copy the data into the sump pump input buffers. */ if (sp_src->transfer_size != 0 && sp_src->transfer_size % PAGE_SIZE != 0) { sp_src->mode = MODE_BUFFERED; start_error(sp, "direct mode reads for file %s," " but the specified transfer size is not a " "multiple of the page size\n", sp_src->fname); return (NULL); } /* if transfer size has been set and the input buffer size is not a * multiple of it, then default to buffered. This is because the * input buffers are directly read into. * We could use vectored reads to eliminate this restriction. */ if (sp_src->transfer_size != 0 && sp->in_buf_size % sp_src->transfer_size != 0) { sp_src->mode = MODE_BUFFERED; start_error(sp, "direct mode reads for file %s, but " "the sump pump input buffer size is not a multiple " "of the specified transfer size\n", sp_src->fname); return (NULL); } /* if the input buffer size is not a multiple of the page size */ if (sp->in_buf_size % PAGE_SIZE != 0) { /* flag error if direct was specified, otherwise use buffered */ if (specified_mode == MODE_DIRECT) { start_error(sp, "direct mode reads were specified for file %s, " "but the sump pump input buffer size is not a " "multiple of the page size\n", sp_src->fname); return (NULL); } sp_src->mode = MODE_BUFFERED; } } #if defined(AIO_CAPABLE) if (sp_src->can_seek && sp_src->mode == MODE_DIRECT) { reader_main = file_reader_direct; if (sp_src->transfer_size == 0) sp_src->transfer_size = 512 * 1024; /* probably should be larger * for Windows. */ if (sp_src->aio_count == 0) sp_src->aio_count = 4; /* now close file and reopen it as direct */ # if defined(win_nt) CloseHandle(sp_src->fd); sp_src->fd = CreateFile(sp_src->fname, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, NULL); if (sp_src->fd == INVALID_HANDLE_VALUE) return (NULL); # else close(sp_src->fd); sp_src->fd = open(sp_src->fname, O_DIRECT); if (sp_src->fd < 0) return (NULL); # endif } else #endif { if (Default_rw_test_size != 0) { /* test mode for some regression tests */ reader_main = file_reader_test; sp_src->transfer_size = Default_rw_test_size; } else reader_main = file_reader_buffered; if (sp_src->transfer_size == 0) { if (sp_src->can_seek) /* if normal file */ sp_src->transfer_size = DEFAULT_BUFFERED_TRANSFER_SIZE; else sp_src->transfer_size = DEFAULT_PIPE_TRANSFER_SIZE; } } /* create reader thread */ if (pthread_create(&sp_src->thread, NULL, reader_main, sp_src) != 0) return (NULL); return (sp_src); } /* sp_open_file_dst - use the specified file as the output for the * specified output of the specified sump pump. * * Parameters: * sp - sp_t identifier for which we are opening an output file. * out_index - Integer indicating the sump pump output index. * fname_mods - Name of the file, potentially followed by one or more * of the following modifiers (with no intervening spaces): * ,BUFFERED or ,BUF The file will be written with normal * buffered (not direct) writes. * ,DIRECT or ,DIR The file will be written with direct * and asynchronous writes. * ,TRANSFER=%d{k,m,g} or ,TRANS=%d{k,m,g} or ,TR=%d{k,m,g} * The transfer size (write request size) * is specified in kilo, mega or giga * bytes. * ,COUNT=%d or ,CO=%d The count of the maximum number of * outstanding asynchronous write requests * is given. * Example: * myfilename,dir,trans=4m,co=4 * The above example specifies a file * name of "myfilename", with direct * and asynchronous writes, with a * request size of 4 MB, and a maximum * of 4 outstanding asynchronous write * requests at any time. * * Returns: NULL if an error occurs in opening the file, otherwise * a valid sump pump file structure. */ sp_file_t sp_open_file_dst(sp_t sp, unsigned out_index, const char *fname_mods) { sp_file_t sp_dst; int ret; char *comma_char; int fname_len; void *(*writer_main)(void *); int specified_mode; if ((sp_dst = (sp_file_t)calloc(1, sizeof(struct sp_file))) == NULL) return (NULL); sp_dst->sp = sp; comma_char = strchr(fname_mods, ','); fname_len = (int) (comma_char == NULL ? strlen(fname_mods) : comma_char - fname_mods); sp_dst->fname = (char *)calloc(1, fname_len + 1); memcpy(sp_dst->fname, fname_mods, fname_len); sp_dst->fname[fname_len] = '\0'; if (comma_char != NULL) get_file_mods(sp_dst, comma_char + 1); specified_mode = sp_dst->mode; #if defined(win_nt) if (strcmp(sp_dst->fname, "") == 0) sp_dst->fd = GetStdHandle(STD_OUTPUT_HANDLE); else if (strcmp(sp_dst->fname, "") == 0) sp_dst->fd = GetStdHandle(STD_ERROR_HANDLE); else { sp_dst->fd = CreateFile(sp_dst->fname, GENERIC_WRITE, FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); if (sp_dst->fd == INVALID_HANDLE_VALUE) return (NULL); sp_dst->can_seek = (GetFileType(sp_dst->fd) == FILE_TYPE_DISK); } #else if (strcmp(sp_dst->fname, "") == 0) sp_dst->fd = 1; else if (strcmp(sp_dst->fname, "") == 0) sp_dst->fd = 2; else { struct stat buf; sp_dst->fd = open(sp_dst->fname, O_WRONLY | O_CREAT, 0777); if (sp_dst->fd < 0) return (NULL); if (fstat(sp_dst->fd, &buf) != 0) return (NULL); sp_dst->can_seek = S_ISREG(buf.st_mode); } #endif /* if direct mode was specified but file is not a normal file */ if (specified_mode == MODE_DIRECT && !sp_dst->can_seek) { start_error(sp, "direct mode writes were requested for file %s, but it" " is either not a file or stdout/stderr\n", sp_dst->fname); return (NULL); } if (sp_dst->mode == MODE_UNSPECIFIED) sp_dst->mode = sp_dst->can_seek ? Default_file_mode : MODE_BUFFERED; /* if file mode is direct (whether by specification or default) */ if (sp_dst->mode == MODE_DIRECT) { /* if a transfer size has been specified that is not a multiple of the * page size, then silently revert to buffered mode. * We probably should instead stay with direct mode, allocate separate * read buffers, and copy the data into the sump pump input buffers. */ if (sp_dst->transfer_size != 0 && sp_dst->transfer_size % PAGE_SIZE != 0) { sp_dst->mode = MODE_BUFFERED; start_error(sp, "direct mode writes for file %s" ", but the transfer size is not a " "multiple of the page size\n", sp_dst->fname); return (NULL); } } #if defined(AIO_CAPABLE) if (sp_dst->can_seek && sp_dst->mode == MODE_DIRECT) { writer_main = file_writer_direct; if (sp_dst->transfer_size == 0) sp_dst->transfer_size = 512 * 1024; if (sp_dst->aio_count == 0) sp_dst->aio_count = 4; /* now close file and reopen it as direct */ # if defined(win_nt) CloseHandle(sp_dst->fd); sp_dst->fd = CreateFile(sp_dst->fname, GENERIC_WRITE, FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, NULL); if (sp_dst->fd == INVALID_HANDLE_VALUE) return (NULL); # else close(sp_dst->fd); sp_dst->fd = open(sp_dst->fname, O_DIRECT | O_WRONLY | O_CREAT, 0777); if (sp_dst->fd < 0) return (NULL); # endif } else #endif { writer_main = file_writer_buffered; if (sp_dst->transfer_size == 0) { if (Default_rw_test_size != 0) sp_dst->transfer_size = Default_rw_test_size; else if (sp_dst->can_seek) /* if normal file */ sp_dst->transfer_size = DEFAULT_BUFFERED_TRANSFER_SIZE; else sp_dst->transfer_size = DEFAULT_PIPE_TRANSFER_SIZE; } } sp_dst->out_index = out_index; /* create writer thread */ if ((ret = pthread_create(&sp_dst->thread, NULL, writer_main, sp_dst))) die("sp_open_file_dst: pthread_create() ret: %d\n", ret); return (sp_dst); } /* sp_file_wait - wait for the specified file connection to complete. * * Returns: SP_OK or a sump pump error code */ int sp_file_wait(sp_file_t sp_file) { if (sp_file->wait_done == TRUE) return (sp_file->error_code); pthread_join(sp_file->thread, NULL); sp_file->wait_done = TRUE; return (sp_file->error_code); } /* check_task_done - internal routine to make sure there is room for at * least one new task. * Caller must have locked sump_mtx. */ static void check_task_done(sp_t sp) { sp_task_t t; TRACE("check_task_done() called\n"); /* while there are tasks which have not yet been recognized as done. */ if (sp->cnt_task_init > sp->cnt_task_done) { /* while there is no room for a new task */ while (sp->error_code == 0 && (sp->cnt_task_init > sp->cnt_task_drained + sp->num_tasks - 1)) { TRACE("check_task_done() condition wait for task %d\n", sp->cnt_task_drained); pthread_cond_wait(&sp->task_drained_cond, &sp->sump_mtx); } if (sp->error_code != 0) { TRACE("check_task_done() returning because of error_code: %d\n", sp->error_code); return; } /* Verify the actual ending position of each done task * matches its expected ending position. */ while (sp->cnt_task_drained > sp->cnt_task_done) { t = &sp->task[sp->cnt_task_done % sp->num_tasks]; TRACE("check_task_done() task %d verify\n", sp->cnt_task_done); if (t->curr_in_buf_index != t->expected_end_index || (t->curr_rec - t->in_buf) != t->expected_end_offset) { die("task %d input ending mismatch: " "ci %d, ei %d, ao %d, eo %d\n", sp->cnt_task_done, t->curr_in_buf_index, t->expected_end_index, (t->curr_rec - t->in_buf), t->expected_end_offset); } sp->cnt_task_done++; } } TRACE("check_task_done() returning\n"); } /* init_new_task - internal routine called by sp_write_input() to * initialize a new sump pump task. */ static sp_task_t init_new_task(sp_t sp, in_buf_t *ib, char *curr_rec) { sp_task_t t; unsigned i; t = &sp->task[sp->cnt_task_init % sp->num_tasks]; t->task_number = sp->cnt_task_init; /* record starting ib and offset */ t->in_buf = ib->in_buf; /* if the size is 0, this empty task indicates EOF */ t->in_buf_bytes = ib->in_buf_bytes; t->curr_rec = curr_rec; t->begin_rec = t->curr_rec; t->curr_in_buf_index = sp->cnt_in_buf_readable - 1; t->begin_in_buf_index = t->curr_in_buf_index; t->expected_end_index = -1; t->expected_end_offset = -1; t->first_in_buf = TRUE; t->outs_drained = 0; for (i = 0; i < sp->num_outputs; i++) { t->out[i].bytes_copied = 0; t->out[i].stalled = FALSE; } t->input_eof = FALSE; t->output_eof = FALSE; sp->cnt_task_init++; return (t); } /* new_in_buf - wait, if necessary, until an in_buf is available to be filled * with input data. */ static void new_in_buf(sp_t sp) { in_buf_t *ib = NULL; TRACE("new_in_buf: waiting for buffer\n"); pthread_mutex_lock(&sp->sump_mtx); while (sp->error_code == 0 && sp->cnt_in_buf_readable >= sp->cnt_in_buf_done + sp->num_in_bufs) { /* get oldest buffer not yet recognized as done */ ib = &sp->in_buf[sp->cnt_in_buf_done % sp->num_in_bufs]; /* if all readers of this input buffer are done reading */ if (ib->num_readers == ib->num_readers_done) { sp->cnt_in_buf_done++; break; } pthread_cond_wait(&sp->in_buf_done_cond, &sp->sump_mtx); } pthread_mutex_unlock(&sp->sump_mtx); } /* eof_without_new_in_buf_or_task - clean eof, no need to flush an input * buffer or start a new task. */ static void eof_without_new_in_buf_or_task(sp_t sp) { pthread_mutex_lock(&sp->sump_mtx); sp->input_eof = TRUE; /* wake sump thread waiting for next input buffer (just signal OK?) */ pthread_cond_broadcast(&sp->in_buf_readable_cond); /* wake all sump threads waiting for new task */ pthread_cond_broadcast(&sp->task_avail_cond); /* wake writer thread as it should exit on EOF */ pthread_cond_broadcast(&sp->task_output_ready_cond); pthread_mutex_unlock(&sp->sump_mtx); } /* flush_in_buf - flush an input buffer and start a new task if necessary */ static void flush_in_buf(sp_t sp, size_t buf_bytes, int eof) { in_buf_t *ib; sp_task_t t; char *curr_rec; char *p; /* get ready to release in_buf to pump threads executing pump funcs */ /* initially, no readers (there will be at least one) */ ib = &sp->in_buf[sp->cnt_in_buf_readable % sp->num_in_bufs]; ib->num_readers = 0; ib->num_readers_done = 0; curr_rec = ib->in_buf; ib->in_buf_bytes = buf_bytes; sp->in_buf_current_bytes = 0; /* if this is not the first buffer and we are not processing * whole buffers. */ if (sp->cnt_in_buf_readable != 0 && REC_TYPE(sp) != SP_WHOLE_BUF) { /* if we are grouping record by key values */ if (sp->flags & SP_GROUP_BY) { /* the pump thread performing the most previously * issued task will always read this input buffer in * order to find the end of its input */ ib->num_readers = 1; switch (REC_TYPE(sp)) { case SP_UTF_8: /* scan until either we find a record whose key * difference index is less than the number of * "group by" keys, or we scan to the end of the buffer. */ while (curr_rec < ib->in_buf + ib->in_buf_bytes) { /* if this is not the beginning of the buffer or * the buffer begins with a whole record. */ if (curr_rec != ib->in_buf || sp->prev_in_buf_ending_rec_partial_bytes == 0) { /* if match character indicates a new key * grouping, then stop as this the boundry * point between tasks. */ if (*curr_rec == '0') break; } /* find next instance of newline in buffer, if any */ curr_rec = memchr(curr_rec, *(char *)sp->delimiter, ib->in_buf_bytes - (curr_rec - ib->in_buf)); if (curr_rec == NULL) { curr_rec = ib->in_buf + ib->in_buf_bytes; break; } curr_rec++; /* step over newline */ } break; case SP_FIXED: if (sp->prev_in_buf_ending_rec_partial_bytes == 0) curr_rec = ib->in_buf; else curr_rec = ib->in_buf + (sp->rec_size + 1) - sp->prev_in_buf_ending_rec_partial_bytes; while (curr_rec < ib->in_buf + ib->in_buf_bytes) { if (*curr_rec == '0') break; curr_rec += sp->rec_size + 1; } break; } } else /* we are not grouping records by key value */ { /* if this in buffer starts with a partial record, then * the pump thread performing the previous issued task * will read this buffer in order to find the end of its * input */ if (sp->prev_in_buf_ending_rec_partial_bytes != 0) { /* the previously issued task is required to read the * record reamainder at the beginning of this buffer. */ ib->num_readers = 1; switch (REC_TYPE(sp)) { case SP_UTF_8: /* find next instance of newline in buffer, if any */ curr_rec = memchr(curr_rec, *(char *)sp->delimiter, ib->in_buf_bytes - (curr_rec - ib->in_buf)); if (curr_rec == NULL) curr_rec = ib->in_buf + ib->in_buf_bytes; else curr_rec++; /* step over newline */ break; case SP_FIXED: curr_rec = ib->in_buf + (sp->rec_size - sp->prev_in_buf_ending_rec_partial_bytes); break; } } } } switch (REC_TYPE(sp)) { case SP_UTF_8: /* determine how many bytes the are in any partial record at * the end of this buffer. search backwards to find last newline * character. */ for (p = ib->in_buf + ib->in_buf_bytes - 1; p >= ib->in_buf; p--) if (*p == *(char *)sp->delimiter) break; if (p < ib->in_buf) /* if there was no newline */ { /* add entire buffer size to this partial record */ sp->prev_in_buf_ending_rec_partial_bytes += ib->in_buf_bytes; } else sp->prev_in_buf_ending_rec_partial_bytes = (ib->in_buf + ib->in_buf_bytes - 1) - p; break; case SP_FIXED: sp->prev_in_buf_ending_rec_partial_bytes = (sp->prev_in_buf_ending_rec_partial_bytes + ib->in_buf_bytes) % (sp->rec_size + ((sp->flags & SP_GROUP_BY) ? 1 : 0)); break; case SP_WHOLE_BUF: sp->prev_in_buf_ending_rec_partial_bytes = 0; curr_rec = ib->in_buf; break; } TRACE("flush_in_buf: ib %d readable with %d bytes\n", sp->cnt_in_buf_readable, ib->in_buf_bytes); pthread_mutex_lock(&sp->sump_mtx); /* make input buffer available to any existing task. * in theory there should be only one task waiting */ sp->cnt_in_buf_readable++; pthread_cond_broadcast(&sp->in_buf_readable_cond); if (eof) { /* if we found the starting point for a new task, then add a * reader for the task that will start its input with this * input buffer. */ if (curr_rec < ib->in_buf + ib->in_buf_bytes) ib->num_readers++; /* set the expected end point for the previous task as the * actual begin point for the task we are about to define. */ if (sp->cnt_task_init != 0) /* if there was a previous task */ { t = &sp->task[(sp->cnt_task_init - 1) % sp->num_tasks]; /* if we just made availible a non-empty in_buf * that did NOT start a new task. */ if (ib->in_buf_bytes != 0 && curr_rec >= ib->in_buf + ib->in_buf_bytes) { /* the expected ending in_buf is the next (and empty) * one to be issued. */ t->expected_end_index = sp->cnt_in_buf_readable; t->expected_end_offset = 0; pthread_mutex_unlock(&sp->sump_mtx); /* do not issue a new task here. */ eof_without_new_in_buf_or_task(sp); return; } else { /* the expected ending in_buf is the one just issued * and the offset is the end. */ t->expected_end_index = sp->cnt_in_buf_readable - 1; t->expected_end_offset = (int)(curr_rec - ib->in_buf); } /* Note: it is possible that at this point the previous * task has already completed. This is why tasks should * not use their expected ending point to confirm that * they ended at the right point. This thread should * perform the check after the task has completed. */ } /* make sure there is at least one available task struct */ check_task_done(sp); if (sp->error_code != 0) { pthread_mutex_unlock(&sp->sump_mtx); return; } TRACE("flush_in_buf: initializing task %d\n", sp->cnt_task_init); t = init_new_task(sp, ib, curr_rec); sp->input_eof = TRUE; /* wake all sump threads */ pthread_cond_broadcast(&sp->task_avail_cond); /* wake writer thread as it should exit on EOF */ pthread_cond_broadcast(&sp->task_output_ready_cond); pthread_mutex_unlock(&sp->sump_mtx); return; } /* if we found the ending point for a task (or this was the * very first input read), then start a new task. */ if (curr_rec < ib->in_buf + ib->in_buf_bytes) { /* add a reader for the task that will start its input with * this input buffer. */ ib->num_readers++; /* set the expected end point for the previous task as the * actual begin point for the task we are about to define. */ if (sp->cnt_task_init != 0) /* if there was a previous task */ { t = &sp->task[(sp->cnt_task_init - 1) % sp->num_tasks]; /* the expected ending in_buf is the one just issued * and the offset is. */ t->expected_end_index = sp->cnt_in_buf_readable - 1; t->expected_end_offset = (int)(curr_rec - ib->in_buf); /* Note: it is possible that at this point the previous * task has already completed. This is why tasks should * not use their expected ending point to confirm that * they ended at the right point. This thread should * perform the check after the task has completed. */ } /* make sure there is at least one available task struct */ check_task_done(sp); if (sp->error_code != 0) { pthread_mutex_unlock(&sp->sump_mtx); return; } TRACE("flush_in_buf: initializing task %d\n", sp->cnt_task_init); t = init_new_task(sp, ib, curr_rec); /* wake 1 sump thread */ pthread_cond_signal(&sp->task_avail_cond); } pthread_mutex_unlock(&sp->sump_mtx); return; } /* sp_write_input - write data that is the input to a sump pump. * A write size of 0 indicates input EOF. */ ssize_t sp_write_input(sp_t sp, void *buf, ssize_t size) { size_t src_remaining = size; size_t dst_remaining; size_t trans_size; char *trans_src; char *trans_dst; in_buf_t *ib; #if !defined(SUMP_PUMP_NO_SORT) int ret; #endif TRACE("sp_write_input: size %d\n", size); if (size <= 0 && sp->input_eof) return (0); /* ignore, already eof */ if (sp->error_code) return (-1); /* already error */ if (size <= 0) { if (size < 0) { /* TO-DO: need to make sure a partial input record does not * cause a sump hang instead of a sump error. A sump hang * shouldn't matter if sump pump invoker waits for sump * pumps in upstream-to-downstream order */ sp->error_code = SP_UPSTREAM_ERROR; pthread_mutex_lock(&sp->sump_mtx); broadcast_all_conds(sp); pthread_mutex_unlock(&sp->sump_mtx); size = 0; /* act as if normal eof */ #if !defined(SUMP_PUMP_NO_SORT) if (sp->flags & SP_SORT) { (*Nsort_end)(&sp->nsort_ctx); } #endif return (0); /* caller is indicating error, return OK status */ } /* else size == 0 */ #if !defined(SUMP_PUMP_NO_SORT) if (sp->flags & SP_SORT) { ret = (*Nsort_release_end)(&sp->nsort_ctx); if (ret < 0) /* if error */ { post_nsort_error(sp, ret); return (-1); } else { pthread_mutex_lock(&sp->sump_mtx); sp->sort_state = SORT_OUTPUT; pthread_cond_broadcast(&sp->task_output_ready_cond); pthread_mutex_unlock(&sp->sump_mtx); return (0); } } #endif /* for non-sort case, fall through */ } #if !defined(SUMP_PUMP_NO_SORT) if (sp->flags & SP_SORT) { if (sp->sort_state != SORT_INPUT) return (0); ret = (*Nsort_release_recs)(buf, size, &sp->nsort_ctx); if (ret < 0) /* if error */ post_nsort_error(sp, ret); return (ret == NSORT_SUCCESS ? size : 0); } #endif /* if EOF and there isn't a partially filled input buffer needing release. */ if (src_remaining == 0 && sp->in_buf_current_bytes == 0) { eof_without_new_in_buf_or_task(sp); return (0); } /* while this is not EOF or there is a partially filled input buffer */ while (src_remaining != 0 || sp->in_buf_current_bytes != 0) { /* if it is NOT the case that we have already partially filled an * input buffer that has not yet been released to the sump pump. * then get a new input buffer. */ if (sp->in_buf_current_bytes == 0) { new_in_buf(sp); if (sp->error_code != 0) return (-1); } ib = &sp->in_buf[sp->cnt_in_buf_readable % sp->num_in_bufs]; TRACE("sp_write_input: readable: %d, partial: %d\n", sp->cnt_in_buf_readable, sp->in_buf_current_bytes); trans_src = (char *)buf + size - src_remaining; trans_size = src_remaining; trans_dst = ib->in_buf + sp->in_buf_current_bytes; dst_remaining = ib->in_buf_size - sp->in_buf_current_bytes; if (trans_size > dst_remaining) trans_size = dst_remaining; memcpy(trans_dst, trans_src, trans_size); src_remaining -= trans_size; sp->in_buf_current_bytes += trans_size; dst_remaining = ib->in_buf_size - sp->in_buf_current_bytes; /* if this isn't EOF and there is more space remaining the in_buf, * then return so caller can write more bytes or declare EOF. */ if (size != 0 && dst_remaining > 0) { if (src_remaining != 0) die("sp_write_input: dst_remaining %d and src_remaining %d\n", dst_remaining, src_remaining); TRACE("sp_write_input() returning %d, dst_remaining: %d\n", size, dst_remaining); return size; } flush_in_buf(sp, sp->in_buf_current_bytes, size == 0); } if (sp->error_code) size = -1; TRACE("sp_write_input: returning %d\n", size); return (size); } /* sp_get_in_buf - get a pointer to an input buffer that an external * thread can fill with input data. * * Returns: SP_OK or a sump pump error code */ int sp_get_in_buf(sp_t sp, uint64_t buf_index, void **buf, size_t *size) { in_buf_t *ib; if (sp->flags & SP_SORT) return (SP_SORT_INCOMPATIBLE); *buf = NULL; *size = 0; TRACE("sp_get_in_buf: waiting for input buffer\n"); if (buf_index < sp->cnt_in_buf_readable || buf_index >= sp->cnt_in_buf_readable + sp->num_in_bufs) { return (SP_BUF_INDEX_ERROR); } pthread_mutex_lock(&sp->sump_mtx); while (sp->error_code == 0 && buf_index >= sp->cnt_in_buf_done + sp->num_in_bufs) { /* get oldest buffer not yet recognized as done */ ib = &sp->in_buf[sp->cnt_in_buf_done % sp->num_in_bufs]; /* if all readers of this input buffer are done reading */ if (ib->num_readers == ib->num_readers_done) { sp->cnt_in_buf_done++; continue; } pthread_cond_wait(&sp->in_buf_done_cond, &sp->sump_mtx); } if (sp->error_code == 0) { ib = &sp->in_buf[buf_index % sp->num_in_bufs]; *buf = ib->in_buf; *size = ib->in_buf_size; } pthread_mutex_unlock(&sp->sump_mtx); if (sp->error_code) return (sp->error_code); return (SP_OK); } /* sp_put_in_buf_bytes - flush bytes that have been placed in a sump * pump's input buffer by an external thread. * This function should only be used by first * calling sp_get_in_buf(). * * Returns: SP_OK or a sump pump error code */ int sp_put_in_buf_bytes(sp_t sp, uint64_t buf_index, size_t size, int eof) { if (sp->flags & SP_SORT) return (SP_SORT_INCOMPATIBLE); if (buf_index != sp->cnt_in_buf_readable) return (SP_BUF_INDEX_ERROR); if (size == 0) eof_without_new_in_buf_or_task(sp); else flush_in_buf(sp, size, eof); if (sp->error_code) return (sp->error_code); return (SP_OK); } /* sp_get_error - get the error code of a sump pump. * * Returns: SP_OK if no error has occurred, otherwise the error code. */ int sp_get_error(sp_t sp) { return (sp->error_code); } /* pfunc_get_thread_index - can be used by pump functions to get the * index of the sump pump thread executing the * pump func. For instance, if there are 4 * sump pump threads, this function will return * 0-3 depending on which of the 4 threads is * invoking it. * * Returns: The index of the requesting sump pump thread. */ int pfunc_get_thread_index(sp_task_t t) { return (t->thread_index); } /* pfunc_get_task_number - can be used by pump functions to get the sump * pump task number being executed by the pump * function. This number starts at 0 and * increases with each subsequent task * issued/started by the sump pump. * * Returns: The task number (starting with 0 as the first task) that the * calling thread is currently executing. */ uint64_t pfunc_get_task_number(sp_task_t t) { return (t->task_number); } /* pfunc_write - write function that can be used by a pump function to * write the output data for the pump function. * * Returns: the number of bytes written. If this is not the same as the * requested size, an error has occurred. */ size_t pfunc_write(sp_task_t t, unsigned out_index, void *buf, size_t size) { char *src = (char *)buf; size_t bytes_left = size; size_t copy_bytes; sp_t sp = t->sp; struct task_out *out = t->out + out_index; if (sp->error_code != SP_OK) return (0); if (out_index >= sp->num_outputs) { pfunc_error(t, "pfunc_write, out_index %d >= num_outputs %d\n", out_index, sp->num_outputs); sp->error_code = SP_OUTPUT_INDEX_ERROR; return (0); } /* while can't fit existing map output buffer contents and the new record * into the map output buffer. */ while (bytes_left + out->bytes_copied > out->size) { copy_bytes = out->size - out->bytes_copied; if (copy_bytes) { memmove(out->buf + out->bytes_copied, src, copy_bytes); src += copy_bytes; bytes_left -= copy_bytes; out->bytes_copied += copy_bytes; } TRACE("pfunc_write: waking output reader\n"); pthread_mutex_lock(&sp->sump_mtx); out->stalled = TRUE; pthread_cond_broadcast(&sp->task_output_ready_cond); TRACE("pfunc_write: waiting for available output buffer\n"); while (out->stalled && sp->error_code == 0) pthread_cond_wait(&sp->task_output_empty_cond, &sp->sump_mtx); pthread_mutex_unlock(&sp->sump_mtx); if (sp->error_code != 0) return (-1); } /* copy new record into buffer */ memmove(out->buf + out->bytes_copied, src, bytes_left); out->bytes_copied += bytes_left; return (size); } /* pfunc_mutex_lock - lock the auto-allocated mutex for pump functions. */ void pfunc_mutex_lock(sp_task_t t) { pthread_mutex_lock(&t->sp->sp_mtx); } /* pfunc_mutex_unlock - unlock the auto-allocated mutex for pump functions. */ void pfunc_mutex_unlock(sp_task_t t) { pthread_mutex_unlock(&t->sp->sp_mtx); } /* done_reading_in_buf - internal routine to mark the task's current * input buffer as done for reading. This is * non-trivial because multiple pump threads may * have to read the same input buffer. */ static void done_reading_in_buf(sp_task_t t, int move_to_next_in_buf) { in_buf_t *ib; sp_t sp = t->sp; pthread_mutex_lock(&sp->sump_mtx); /* bump up the count of done readers for the input buffer */ ib = &sp->in_buf[t->curr_in_buf_index % sp->num_in_bufs]; ib->num_readers_done++; /* if all readers are now done, signal the reader thread */ if (ib->num_readers == ib->num_readers_done) pthread_cond_broadcast(&sp->in_buf_done_cond); pthread_mutex_unlock(&sp->sump_mtx); if (move_to_next_in_buf) { /* move this task's current input position to the beginning of * the next input buffer. */ t->curr_in_buf_index++; /* on to next input buffer */ ib = &sp->in_buf[t->curr_in_buf_index % sp->num_in_bufs]; t->in_buf = ib->in_buf; t->in_buf_bytes = 0; t->curr_rec = t->in_buf; } } /* ready_in_buf - internal routine to insure the current input buffer * for a task is ready for reading. */ static void ready_in_buf(sp_task_t t) { in_buf_t *ib; sp_t sp = t->sp; pthread_mutex_lock(&sp->sump_mtx); t->first_in_buf = FALSE; /* now that this task has proceeded beyond * its first input buffer, it should stop * at end of the current key group */ while (sp->error_code == 0 && !sp->input_eof && t->curr_in_buf_index == sp->cnt_in_buf_readable) /*not yet readable*/ { pthread_cond_wait(&sp->in_buf_readable_cond, &sp->sump_mtx); } /* if sp_write_input() has indicated eof and no more readable buffers * then that indicates eof for this task. */ if (sp->error_code != 0 || (sp->input_eof && t->curr_in_buf_index == sp->cnt_in_buf_readable)) { t->input_eof = TRUE; TRACE("pump%d: eof: error_code: %d, in_buf_bytes is 0 at ib %d\n", t->thread_index, sp->error_code, t->curr_in_buf_index); t->in_buf = NULL; t->in_buf_bytes = 0; t->curr_rec = NULL; } else { ib = &sp->in_buf[t->curr_in_buf_index % sp->num_in_bufs]; t->in_buf = ib->in_buf; t->in_buf_bytes = ib->in_buf_bytes; t->curr_rec = t->in_buf; } pthread_mutex_unlock(&sp->sump_mtx); } /* is_more_input - internal routine to test if there is more input for * this pump task. This routine is called after the * pump function returns to see if there is additonal * input data for the task and hence the pump function * should be called again. */ static int is_more_input(sp_task_t t) { if (t->input_eof) /* if input eof, then false (no more input) */ return (0); /* if we are past the first input buffer for this task (because we * had to read the remainder of the last task record at the * beginning of the second input buffer), then we have hit the end * of this task's input. Note that if GROUP_BY is specified, * then the task pump function can read past any record remainder * at the beginning of the second input buffer. */ if (!t->first_in_buf && !(t->sp->flags & SP_GROUP_BY)) { done_reading_in_buf(t, t->curr_rec == t->in_buf + t->in_buf_bytes); t->input_eof = TRUE; return (0); } /* if there are bytes left in the current input buffer, then true * (more input). */ TRACE("pump: imi, rec %08x, buf %08x, bytes: %x\n", t->curr_rec, t->in_buf, t->in_buf_bytes); if (t->curr_rec < t->in_buf + t->in_buf_bytes) return (1); /* this task is done reading its current input buffer */ done_reading_in_buf(t, TRUE); /* if we are not grouping by key difference, then we have hit the * end of this task's input. */ if (!(t->sp->flags & SP_GROUP_BY)) { t->input_eof = TRUE; return (0); } /* get next input buffer */ ready_in_buf(t); TRACE("pump: imi, eof %d\n", t->input_eof); return (!t->input_eof); } /* pfunc_get_rec - get a pointer to the next input record for a pump function. * The sump pump infrastructure allocates the record buffers * and modifies the pointer-to-a-pointer argument to point to * the buffer. If the record type is text, a null character * will terminate the record. * * Returns: 0 if no more records are in the sump pump task input, otherwise * the number bytes in the record not including the terminating null * character. */ size_t pfunc_get_rec(sp_task_t t, void *ptr_to_rec_ptr) { void *buf; char *rec; char *next_rec; int match_char; size_t len; size_t src_size; size_t trans_size; size_t delim_size = 0; sp_t sp = t->sp; int new_key_group_beginning = FALSE; buf = t->rec_buf; if (t->input_eof) return 0; if (REC_TYPE(sp) == SP_UTF_8) delim_size = 1; /* if we have used all the records in the current input buffer... * note that we must be on a record boundry at this point. */ if (t->curr_rec >= t->in_buf + t->in_buf_bytes) { done_reading_in_buf(t, TRUE); /* done reading input buffer */ /* if we are not grouping records then return no record (0) since * we are on a record boundry. */ if (!(sp->flags & SP_GROUP_BY)) { t->input_eof = TRUE; return 0; } ready_in_buf(t); if (t->input_eof) return 0; } /* now we have at least a partial record in the input buffer */ rec = t->curr_rec; /* if there is an initial key offset character */ if (sp->flags & SP_GROUP_BY) { match_char = rec[0]; if (match_char != '0' && match_char != '1') { pfunc_error(t, "-group was specified, but an input record does not start with\n" "'0' or '1'. Was nsort -match used to generate input?\n"); /* return 0; don't return EOF as this can result in infinite loop*/ } rec++; new_key_group_beginning = (match_char == '0' && !t->first_group_rec); } /* if the key offset indicates this is the beginning of a new key group && * this is not the actual first record for this current key group. */ if (!(sp->flags & SP_GROUP_BY) || new_key_group_beginning) { /* if we are beyond the first input buffer for this task, this * rec is not only the beginning of the next key group, it also * means we have reached the end of the input for this sump pump * task. */ if (!t->first_in_buf) { done_reading_in_buf(t, FALSE); TRACE("pump%d: eof: match: %c at ib %d, curr_rec %08x\n", t->thread_index, (sp->flags & SP_GROUP_BY) ? match_char : '0', t->curr_in_buf_index, t->curr_rec); t->input_eof = TRUE; /* no need to be inside the mutex because * only the current thread reads this */ return 0; } /* * else we are still reading from the first input buffer for this * task, but there is at least one more record key group to be * processed by this task. */ else if (new_key_group_beginning) return 0; /* else we are not doing key grouping, go on to return next rec */ } t->first_group_rec = FALSE; /* only relevant for key grouping */ /* now copy the record, which may span multiple in_bufs, into the * caller's buffer. */ len = 0; for (;;) { src_size = t->in_buf_bytes - (rec - t->in_buf); switch (REC_TYPE(sp)) { case SP_UTF_8: next_rec = memchr(rec, *(char *)sp->delimiter, src_size); if (next_rec != NULL) { next_rec++; /* skip over newline */ trans_size = next_rec - rec; } else trans_size = src_size; break; case SP_FIXED: trans_size = src_size; if (trans_size >= sp->rec_size - len) { trans_size = sp->rec_size - len; next_rec = rec + trans_size; } else next_rec = NULL; break; } if (trans_size + len + delim_size > t->rec_buf_size) { size_t new_size; if (REC_TYPE(sp) == SP_FIXED) new_size = sp->rec_size; else new_size = trans_size + len + delim_size + 50; if (t->rec_buf == NULL) t->rec_buf = (char *)calloc(1, new_size); else t->rec_buf = realloc(t->rec_buf, new_size); if (t->rec_buf == NULL) { die("pfunc_get_rec: rec_buf increase failed: old %d new %d\n", t->rec_buf_size, new_size); } t->rec_buf_size = new_size; buf = t->rec_buf; } memmove((char *)buf + len, rec, trans_size); len += trans_size; if (next_rec != NULL) /* if we found the newline delimiter */ { if (REC_TYPE(sp) == SP_UTF_8) ((char *)buf)[len] = '\0'; break; } else /* else we need to get remainder of record from next buffer */ { done_reading_in_buf(t, TRUE); ready_in_buf(t); if (t->input_eof) { if (REC_TYPE(sp) == SP_FIXED) { die("pfunc_get_rec: partial record of " "%d bytes found at end of input\n", len); } return 0; } rec = t->curr_rec; } } t->curr_rec = next_rec; *(void **)ptr_to_rec_ptr = buf; return len; } /* pfunc_get_in_buf - get a pointer to the input buffer for a pump function. */ int pfunc_get_in_buf(sp_task_t t, void **buf, size_t *size) { *(char **)buf = t->curr_rec; *size = (t->in_buf + t->in_buf_bytes) - t->curr_rec; t->curr_rec = t->in_buf + t->in_buf_bytes; return (0); } /* pfunc_get_out_buf - get a pointer to an output buffer and its size for * a pump function. * * Returns: SP_OK or a sump pump error code */ int pfunc_get_out_buf(sp_task_t t, unsigned out_index, void **buf, size_t *size) { sp_t sp = t->sp; struct task_out *out = t->out + out_index; if (out_index >= sp->num_outputs) { pfunc_error(t, "pfunc_get_out_buf: out_index %d >= num_outputs %d\n", out_index, sp->num_outputs); } if (out->bytes_copied == out->size) { sp_t sp = t->sp; TRACE("pfunc_get_out_buf: waking output reader\n"); pthread_mutex_lock(&sp->sump_mtx); out->stalled = TRUE; pthread_cond_broadcast(&sp->task_output_ready_cond); TRACE("pfunc_get_out_buf: waiting for available output buffer\n"); while (out->stalled && sp->error_code == 0) pthread_cond_wait(&sp->task_output_empty_cond, &sp->sump_mtx); pthread_mutex_unlock(&sp->sump_mtx); if (sp->error_code != 0) return (-1); } *(char **)buf = out->buf + out->bytes_copied; *size = out->size - out->bytes_copied; return (0); } /* pfunc_put_out_buf_bytes - flush bytes that have been placed in a pump * function's output buffer. This routine can only * be used by first calling sp_get_out_buf(). * * Returns: SP_OK or a sump pump error code p */ int pfunc_put_out_buf_bytes(sp_task_t t, unsigned out_index, size_t size) { sp_t sp = t->sp; struct task_out *out = t->out + out_index; if (out_index >= sp->num_outputs) { pfunc_error(t, "pfunc_put_out_buf_bytes: " "out_index %d >= num_outputs %d\n", out_index, sp->num_outputs); } else if (out->bytes_copied + size > out->size) { pfunc_error(t, "sp_put_out_buf_bytes: " "aggregate size (%d+%d) is larger than buf size %d\n", out->bytes_copied, size, out->size); } else out->bytes_copied += size; return (sp->error_code); } /* pfunc_printf - print a formatted string to a pump functions's output. * * Returns: the number of bytes written. If this is not the same as the * requested size, an error has occurred. */ int pfunc_printf(sp_task_t t, unsigned out_index, const char *fmt, ...) { va_list ap; ssize_t ret; va_start(ap, fmt); ret = vsnprintf(t->temp_buf, t->temp_buf_size, fmt, ap); va_end(ap); #if defined(win_nt) if (ret == -1) /* non-standard vsnprintf overflow indicator on Windows */ { va_start(ap, fmt); ret = _vscprintf(fmt, ap); va_end(ap); } #endif if ((size_t)ret >= t->temp_buf_size) { /* temp buf wasn't big enough. enlarge it and redo */ if (t->temp_buf_size != 0) free(t->temp_buf); t->temp_buf_size = ret + 10; t->temp_buf = (char *)malloc(t->temp_buf_size); va_start(ap, fmt); ret = vsnprintf(t->temp_buf, t->temp_buf_size, fmt, ap); va_end(ap); } return ((int)pfunc_write(t, out_index, t->temp_buf, ret)); } /* pfunc_error - raise an error for a pump function and define an error * string that can be retrived by other threads using * sp_get_error_string(). * * Returns: SP_PUMP_FUNCTION_ERROR */ int pfunc_error(sp_task_t t, const char *fmt, ...) { va_list ap; int ret; if (t->error_code != 0) /* if prior error */ return SP_PUMP_FUNCTION_ERROR; /* ignore this one */ t->error_code = SP_PUMP_FUNCTION_ERROR; va_start(ap, fmt); ret = vsnprintf(t->error_buf, t->error_buf_size, fmt, ap); va_end(ap); #if defined(win_nt) if (ret == -1) /* non-standard vsnprintf overflow indicator on Windows */ { va_start(ap, fmt); ret = _vscprintf(fmt, ap); va_end(ap); } #endif if ((size_t)ret >= t->error_buf_size) { if (t->error_buf_size != 0) free(t->error_buf); t->error_buf_size = (size_t)ret + 1; t->error_buf = (char *)malloc(t->error_buf_size); va_start(ap, fmt); vsnprintf(t->error_buf, t->error_buf_size, fmt, ap); va_end(ap); } return SP_PUMP_FUNCTION_ERROR; } /* pump_thread_main - the internal "main" routine of a sump pump thread. */ static void *pump_thread_main(void *arg) { sp_task_t t; unsigned thread_index; sp_t sp = (sp_t)arg; int ret; for (thread_index = 0; thread_index < sp->num_threads; thread_index++) #if defined(win_nt) if (sp->thread[thread_index].id == GetCurrentThreadId()) #else if (sp->thread[thread_index] == pthread_self()) #endif break; TRACE("pump thread %d starting\n", thread_index); for (;;) { TRACE("pump%d: waiting for an available task\n", thread_index); pthread_mutex_lock(&sp->sump_mtx); while (sp->cnt_task_begun == sp->cnt_task_init && sp->error_code == 0 && sp->input_eof == FALSE) { pthread_cond_wait(&sp->task_avail_cond, &sp->sump_mtx); } if (sp->error_code != 0 || (sp->input_eof == TRUE && sp->cnt_task_begun == sp->cnt_task_init)) { pthread_mutex_unlock(&sp->sump_mtx); TRACE("pump%d: breaking out of for loop: error_code: %d, input_eof %d\n", thread_index, sp->error_code, sp->input_eof); break; } t = &sp->task[sp->cnt_task_begun % sp->num_tasks]; sp->cnt_task_begun++; t->thread_index = thread_index; pthread_mutex_unlock(&sp->sump_mtx); TRACE("pump%d: calling pump func with %d input bytes\n", thread_index, (int)t->in_buf_bytes); if (REC_TYPE(sp) == SP_WHOLE_BUF) { TRACE("pump%d: calling pump func() block\n", thread_index); ret = (*sp->pump_func)(t, sp->pump_arg); TRACE("pump%d: pump func returned %d\n", thread_index, ret); if (ret) { if (t->error_code == 0) t->error_code = ret; } else { done_reading_in_buf(t, TRUE); t->input_eof = TRUE; } } else { while (is_more_input(t) && t->error_code == 0) { TRACE("pump%d: calling pump func()\n", thread_index); /* indicate first record in key group not yet read */ t->first_group_rec = TRUE; ret = (*sp->pump_func)(t, sp->pump_arg); TRACE("pump%d: pump func returned %d, input_eof: %d\n", thread_index, ret, t->input_eof); if (ret && t->error_code == 0) t->error_code = ret; } } TRACE("pump%d: pump_func returns with %d out[0] bytes\n", thread_index, t->out[0].bytes_copied); pthread_mutex_lock(&sp->sump_mtx); if (t->error_code && sp->error_code == 0) { sp->error_code = t->error_code; if (sp->error_buf != NULL) free(sp->error_buf); sp->error_buf = t->error_buf; t->error_buf = NULL; broadcast_all_conds(sp); } pthread_mutex_unlock(&sp->sump_mtx); TRACE("pump%d: waking output reader\n", thread_index); TRACE("pump%d: waking input writer\n", thread_index); pthread_mutex_lock(&sp->sump_mtx); t->output_eof = TRUE; pthread_cond_broadcast(&sp->task_output_ready_cond); pthread_mutex_unlock(&sp->sump_mtx); /* NOTA BENE: do not use "t" pointer after this point since the * struct that it points to can be reused immediately */ } TRACE("pump%d: exiting\n", thread_index); return (NULL); } /* sp_read_output - read bytes from the specified output of a sump pump. * * Returns: The number of bytes read. If 0, then EOF has occurred. * If negative, an error has occurred. */ ssize_t sp_read_output(sp_t sp, unsigned index, void *buf, ssize_t size) { int out_eof; ssize_t bytes_returned = 0; ssize_t src_remaining = size; ssize_t dst_remaining; ssize_t trans_size; char *trans_src; char *trans_dst; sp_task_t t; struct task_out *out; TRACE("sp_read_output[%d]: buf %08x, size %d\n", index, buf, size); if (sp->error_code) return (-1); #if !defined(SUMP_PUMP_NO_SORT) if (sp->flags & SP_SORT) { int ret; if (sp->sort_state == SORT_INPUT) { pthread_mutex_lock(&sp->sump_mtx); while (sp->error_code == 0 && sp->sort_state == SORT_INPUT) pthread_cond_wait(&sp->task_output_ready_cond, &sp->sump_mtx); pthread_mutex_unlock(&sp->sump_mtx); if (sp->error_code != 0) return (-1); } if (sp->sort_state == SORT_DONE) return (0); while (bytes_returned < size) { /* if there is no data in the sort_temp_buf, then fill it. */ if (sp->out[0].partial_bytes_copied == 0) { /* call nsort_return_recs() until it does NOT return a buffer * too small error. */ for (;;) { char *temp; sp->sort_temp_buf_bytes = sp->sort_temp_buf_size; ret = (*Nsort_return_recs)(sp->sort_temp_buf, &sp->sort_temp_buf_bytes, &sp->nsort_ctx); if (ret != NSORT_RETURN_BUF_SMALL) break; temp = sp->sort_temp_buf; sp->sort_temp_buf_size *= 2; sp->sort_temp_buf = (char *)malloc(sp->sort_temp_buf_size); if (sp->sort_temp_buf == NULL) { sp->error_code = SP_MEM_ALLOC_ERROR; return (-1); } free(temp); } if (ret == NSORT_END_OF_OUTPUT) { pthread_mutex_lock(&sp->sump_mtx); sp->sort_state = SORT_DONE; pthread_cond_broadcast(&sp->task_output_ready_cond); pthread_mutex_unlock(&sp->sump_mtx); return (bytes_returned); } else if (ret < 0) { post_nsort_error(sp, ret); return (-1); } } src_remaining = sp->sort_temp_buf_bytes - sp->out[0].partial_bytes_copied; dst_remaining = size - bytes_returned; trans_size = dst_remaining; trans_src = sp->sort_temp_buf + sp->out[0].partial_bytes_copied; trans_dst = (char *)buf + bytes_returned; if (dst_remaining <= src_remaining) { /* we will fill the caller's buffer completely. */ memmove(trans_dst, trans_src, dst_remaining); bytes_returned += dst_remaining; sp->out[0].partial_bytes_copied += dst_remaining; break; } /* the sort temp buffer will be completely copied out, * and we still need more. */ memmove(trans_dst, trans_src, src_remaining); bytes_returned += src_remaining; /* indicate sort temp output buffer needes to be refilled */ sp->out[0].partial_bytes_copied = 0; } return (bytes_returned); } #endif if (index >= sp->num_outputs) return (-1); for (;;) { TRACE("sp_read_output: out[%d].cnt_task_drained: %d, " "partial_bytes_copied: %d\n", index, sp->out[index].cnt_task_drained, sp->out[index].partial_bytes_copied); t = &sp->task[sp->out[index].cnt_task_drained % sp->num_tasks]; out = t->out + index; /* if we aren't in the middle of copy out a task's buffer, then * we must potentially wait for the output of the next task. */ if (sp->out[index].partial_bytes_copied == 0) { TRACE("sp_read_output: waiting for input\n"); pthread_mutex_lock(&sp->sump_mtx); /* while 1) a sump pump error hasn't occurred. * and 2) it's not the case that * a) EOF on input has been reached * b) all initialized tasks have begun (been taken), and * c) all taken tasks have had their output read, and * and 3) it's not the case oldest sump pump task is either * done or stalled */ while (sp->error_code == 0 && !(out_eof = (sp->input_eof && sp->cnt_task_init == sp->cnt_task_begun && sp->cnt_task_begun == sp->out[index].cnt_task_drained)) && !(sp->out[index].cnt_task_drained < sp->cnt_task_begun && (out->stalled || t->output_eof))) { TRACE("sp_read_output: waiting\n"); TRACE("sp_read_output: cnt_task_init: %d\n", sp->cnt_task_init); TRACE("sp_read_output: cnt_task_begun: %d\n", sp->cnt_task_begun); TRACE("sp_read_output: out[%d].cnt_task_drained: %d\n", index, sp->out[index].cnt_task_drained); TRACE("sp_read_output: out->bytes_copied: %d\n", out->bytes_copied); TRACE("sp_read_output: t->in_buf_bytes: %d\n", t->in_buf_bytes); TRACE("sp_read_output: out->stalled: %d\n", out->stalled); TRACE("sp_read_output: t->output_eof: %d\n", t->output_eof); pthread_cond_wait(&sp->task_output_ready_cond, &sp->sump_mtx); } TRACE("sp_read_output: DONE WAITING\n"); TRACE("sp_read_output: error_code: %d\n", sp->error_code); TRACE("sp_read_output: cnt_task_init: %d\n", sp->cnt_task_init); TRACE("sp_read_output: cnt_task_begun: %d\n", sp->cnt_task_begun); TRACE("sp_read_output: out[%d].cnt_task_drained: %d\n", index, sp->out[index].cnt_task_drained); TRACE("sp_read_output: out->bytes_copied: %d\n", out->bytes_copied); TRACE("sp_read_output: t->in_buf_bytes: %d\n", t->in_buf_bytes); TRACE("sp_read_output: out->stalled: %d\n", out->stalled); TRACE("sp_read_output: t->output_eof: %d\n", t->output_eof); TRACE("sp_read_output: out_eof: %d\n", out_eof); pthread_mutex_unlock(&sp->sump_mtx); if (sp->error_code || out_eof) break; } src_remaining = out->bytes_copied - sp->out[index].partial_bytes_copied; dst_remaining = size - bytes_returned; trans_size = dst_remaining; trans_src = t->out[index].buf + sp->out[index].partial_bytes_copied; trans_dst = (char *)buf + bytes_returned; if (dst_remaining < src_remaining) { /* we will fill the output buffer and still leave some bytes * in the sump pump task's output buffer. */ memmove(trans_dst, trans_src, dst_remaining); bytes_returned += dst_remaining; sp->out[index].partial_bytes_copied += dst_remaining; break; } /* the sump pump task's output buffer will be completely copied out. */ memmove(trans_dst, trans_src, src_remaining); bytes_returned += src_remaining; /* indicate new sump pump task output is needed */ sp->out[index].partial_bytes_copied = 0; TRACE("sp_read_output: waking reader thread\n"); pthread_mutex_lock(&sp->sump_mtx); if (t->out[index].stalled) { /* we have copied the bytes in the buf. clear the buf * and stall indicator, then wake stalled thread. since * there may be more than one thread waiting on the * task_output_empty_cond, use broadcast. */ t->out[index].bytes_copied = 0; t->out[index].stalled = FALSE; pthread_cond_broadcast(&sp->task_output_empty_cond); } else { /* increment per-output task output drained count */ sp->out[index].cnt_task_drained++; TRACE("sp_read_output: sp->out[%d].cnt_task_drained incr to %d\n", index, sp->out[index].cnt_task_drained); TRACE("sp_read_output: t->outs_drained before incr is: %d\n", t->outs_drained); /* if all output buffers for this task have been drained */ if (++t->outs_drained == sp->num_outputs) { /* increment sump pump task drained count */ sp->cnt_task_drained++; TRACE("sp_read_output: sp->cnt_task_drained incr to: %d\n", sp->cnt_task_drained); pthread_cond_broadcast(&sp->task_drained_cond); } } pthread_mutex_unlock(&sp->sump_mtx); if (bytes_returned == size) break; } if (sp->error_code) bytes_returned = -1; TRACE("sp_read_output: returning %d bytes\n", bytes_returned); return (bytes_returned); } /* link_main - internal "main" routine for a thread that links an output * of a sump pump to the input of another sump pump. */ static void *link_main(void *arg) { sp_link_t sp_link = (sp_link_t)arg; ssize_t size; while ((size = sp_read_output(sp_link->out_sp, sp_link->out_index, sp_link->buf, sp_link->buf_size)) > 0) { if (sp_write_input(sp_link->in_sp, sp_link->buf, size) != size) { TRACE("link_main: sp_write_input() returned wrong size\n"); sp_link->error_code = SP_WRITE_ERROR; return (NULL); } } sp_write_input(sp_link->in_sp, NULL, size);/* need to handle err case?*/ if (size < 0) sp_link->error_code = SP_UPSTREAM_ERROR; return (NULL); } /* sp_link - start a link/connection between an output * of a sump pump to the input of another sump pump. * * Returns: SP_OK or a sump pump error code */ int sp_link(sp_t out_sp, unsigned out_index, sp_t in_sp) { struct sp_link *sp_link; int ret; int group_by_input; sp_link = (struct sp_link *)calloc(1, sizeof(struct sp_link)); if (sp_link == NULL) return (SP_MEM_ALLOC_ERROR); group_by_input = (in_sp->flags & SP_GROUP_BY) ? TRUE : FALSE; if (out_sp->match_keys ^ group_by_input) return (SP_GROUP_BY_MISMATCH); sp_link->out_sp = out_sp; sp_link->out_index = out_index; sp_link->in_sp = in_sp; sp_link->buf_size = 4096; sp_link->buf = (char *)malloc(sp_link->buf_size); if (sp_link->buf == NULL) return (SP_MEM_ALLOC_ERROR); if ((ret = pthread_create(&sp_link->thread, NULL, link_main, sp_link))) die("sp_start_link: pthread_create() ret: %d\n", ret); return (SP_OK); } /* get_output_index - internal routine to read an output index preceded by * a "[" and followed by "]=". */ static int get_output_index(sp_t sp, char **caller_p) { int index; char *p = *caller_p; if (*p++ != '[') { syntax_error(sp, p, "expected '['"); return 0; } index = (int)get_numeric_arg(sp, &p); if (index < 0 || (unsigned)index >= sp->num_outputs) { syntax_error(sp, p, "output index is greater than the number of outputs"); return 0; } if (*p++ != ']') { syntax_error(sp, p, "expected ']'"); return 0; } if (*p++ != '=') { syntax_error(sp, p, "expected '='"); return 0; } *caller_p = p; return (index); } /* get_logical_processor_count - internal routine to get the number of * logical processors in the system. */ static int get_logical_processor_count() { #if defined(win_nt) { SYSTEM_INFO si; GetSystemInfo(&si); return si.dwNumberOfProcessors; } #else return sysconf(_SC_NPROCESSORS_ONLN); #endif } /* get_string_arg - internal routine to scan and return a string */ static char *get_string_arg(char **caller_p) { char *begin_p = *caller_p; char *p; char *ret; int i; /* scan string up to the next white space character */ p = begin_p; while (!isspace(*(unsigned char *)p) && *p != '\0') p++; *caller_p = p; /* allocate space for the return string and copy contents to it */ ret = (char *)calloc(1, p - begin_p + 1); for (i = 0; i < p - begin_p; i++) ret[i] = begin_p[i]; ret[p - begin_p] = '\0'; return (ret); } /* sp_argv_to_str - bundle up the specified argv and return it as a string. * For instance if argc is 2, argv[0] is "TASKS=2" and * argv[1] is "THEADS=3", then return the string * "TASKS=2 THREADS=3". * * Returns: a string containing the command line arguments passed as function * arguments. The string should be free()'d when no longer needed. */ char *sp_argv_to_str(char *argv[], int argc) { int i; char *str = NULL; int n_chars; n_chars = 1; /* '\0' */ for (i = 0; i < argc; i++) n_chars += 1 + (int)strlen(argv[i]); /* ' ' + argv[i] */ str = (char *)calloc(sizeof(char), n_chars + 1); strcpy(str, argc == 0 ? "" : argv[0]); for (i = 1; i < argc; i++) { strcat(str, "\n"); strcat(str, argv[i]); } return (str); } /* get_exec_args - internal routine to get an external program name and its * arguments */ static void get_exec_args(sp_t sp, char **ep) { char *p, *begin; int i, cnt; #if defined(win_nt) int cmdlen; char *cmdline; for (p = *ep, cnt = 0; *p != '\0'; p++) if (*p == '\n') cnt++; /* count newlines */ sp->exec_argv = (char **)calloc(2, sizeof(char *)); if (sp->exec_argv == NULL) { sp->error_code = SP_MEM_ALLOC_ERROR; return; } /* allocate space for each character plus 2 double-quote chars per arg */ cmdline = sp->exec_argv[0] = (char *)calloc((p - *ep) + 2 * (cnt + 1), sizeof(char)); if (cmdline == NULL) { sp->error_code = SP_MEM_ALLOC_ERROR; return; } cmdlen = 0; #endif p = *ep; i = 0; cnt = 0; for (;;) { #if defined(win_nt) int need_quotes = FALSE; if (i != 0) cmdline[cmdlen++] = ' '; /* add space between args */ #else /* grow argv for non-Windows systems */ # define SP_ARGV_INCR 5 if (i == 0) { cnt = SP_ARGV_INCR; sp->exec_argv = (char **)calloc(cnt + 1, sizeof(char *)); } else if (i == cnt) { cnt += SP_ARGV_INCR; sp->exec_argv = (char **) realloc(sp->exec_argv, (cnt + 1) * sizeof(char *)); } if (sp->exec_argv == NULL) { sp->error_code = SP_MEM_ALLOC_ERROR; return; } #endif begin = p; while (*p != '\n' && *p != '\0') { #if defined(win_nt) if (*p == ' ') need_quotes = TRUE; #endif p++; } #if defined(win_nt) if (need_quotes) cmdline[cmdlen++] = '"'; /* begin double-quote */ memcpy(cmdline + cmdlen, begin, p - begin); cmdlen += p - begin; if (need_quotes) cmdline[cmdlen++] = '"'; /* end double-quote */ #else sp->exec_argv[i] = calloc(sizeof(char), p - begin + 1); if (sp->exec_argv[i] == NULL) { sp->error_code = SP_MEM_ALLOC_ERROR; return; } memcpy(sp->exec_argv[i], begin, p - begin); sp->exec_argv[i][begin - p] = '\0'; TRACE("argv[%d]: %s\n", i, sp->exec_argv[i]); #endif if (*p == '\n') p++; if (*p == '\0') break; i++; } *ep = p; #if defined(win_nt) cmdline[cmdlen] = '\0'; TRACE("cmdline: %s\n", cmdline); #endif } /* sp_start - Start a sump pump * * Parameters: * sp - Pointer to where to return newly allocated sp_t * identifier that will be used in as the first argument * to all subsequent sp_*() calls. * pump_func - Pointer to pump function that will be called by * multiple sump pump threads at once. If an external * program name is specified in the arg_fmt string, this * parameter can and must be NULL. * arg_fmt - Printf-format-like string that can be used to specify * the following sump pump directives: * -ASCII or -UTF_8 Input records are ascii/utf-8 * characters delimited by a newline * character. * -GROUP_BY or -GROUP Group input records for the purpose * of reducing them. The sump pump input * should be coming from an nsort * instance where the "-match" * directive has been declared. This * directive prevents records with * equal keys from being dispersed to * more than one sump pump task. * -IN=%s or -IN_FILE=%s Input file name for the sump pump * input. if not specified, the input * should be written into the sump pump * either by calls to sp_write_input() * or sp_start_link() * The input file name can be followed * by options that control the file * access mode and transfer size. * See sp_open_file_src() comments. * -IN_BUF_SIZE=%d{k,m,g} Overrides default input buffer * size (256kb). If a 'k', 'm' or 'g' * suffix is specified, the specified * size is multiplied by 2^10, 2^20 or * 2^30 respectively. * -IN_BUFS=%d Overrides default number of input * buffers (the number of tasks). * -OUT[%d]=%s or -OUT_FILE[%d]=%s The output file name for * the specified output index, or output * 0 if no index is specified. If not * defined, the output should be read * either by calls to sp_read_output() * or by sp_start_link(). * The output file name can be followed * by options that control the file * access mode and transfer size. * See sp_open_file_dst() comments. * -OUT_BUF_SIZE[%d]=%d{x,k,m,g} Overrides default output * buffer size (2x the input buf size) * for the specified output index, or * output 0 if no index is specified. * If the size ends with a suffix of * 'x', the size is used as a multiplier * of the input buffer size. If a 'k', * 'm' or 'g' suffix is specified, the * specified size is multiplied by 2^10, * 2^20 or 2^30 respectively. It is not * an error if the output of a task * exceeds the output buffer size, but * it can potentially result in loss * of parallelism. * -OUTPUTS=%d Overrides default number of output * streams (1). * -REC_SIZE=%d Defines the input record size in * bytes. The record contents need not * be ascii nor delimited by a newline * character. If not specified, records * must consist of ascii or utf-8 * characters and be terminated by a * newline. * -TASKS=%d Overrides default number of output * tasks (3x the number of threads). * -THREADS=%d Overrides default number of threads * that are used to execute the pump * function in parallel. The default is * the number of logical processors in * the system. * -WHOLE or Processing is not done by input * -WHOLE_BUF records so not input record type * should be defined. Instead, * processing is done by whole input * buffers. * -DEFAULT_FILE_MODE={BUFFERED,BUF,DIRECT,DIR} Set the * default file access mode for both * input and output files. If none is * specified, the direct mode is used * to access input and output files for * which a BUFFERED file modifier is * not specified. * [external_program_name external_program_arguments] * The name of an external program and * its arguments. The program name * cannot start with the '-' character. * The external program name and its * arguments must be last in the * arg_fmt string. If the program name * does not include a path, the PATH * environment variable will be used * to find it. * ... potential subsequent arguments to arg_fmt * * Returns: SP_OK or a sump pump error code * * The sump pump directives can also appear in a SUMP_PUMP environment * variable. */ int sp_start(sp_t *caller_sp, sp_pump_t pump_func, char *arg_fmt, ...) { sp_t sp; char *s; unsigned i; unsigned j; int ret; char *p; int index; char *args; char err_buf[200]; if (TraceFp == NULL && (s = getenv("SUMP_PUMP_TRACE")) != NULL && strlen(s) > 0) { if (!strcmp(s, "")) TraceFp = stdout; else if (!strcmp(s, "")) TraceFp = stderr; else { TraceFp = fopen(s, "a+"); if (TraceFp == NULL) fprintf(stderr, "can't open SUMP_PUMP_TRACE=%s\n", s); else { time_t now = time(NULL); trace("begin new sump pump trace, %s", ctime(&now)); } } } *caller_sp = NULL; sp = (sp_t)calloc(1, sizeof(struct sump)); if (sp == NULL) return (SP_MEM_ALLOC_ERROR); sp->error_buf_size = ERROR_BUF_SIZE; sp->error_buf = (char *)calloc(1, sp->error_buf_size); if (sp->error_buf == NULL) return (SP_MEM_ALLOC_ERROR); *caller_sp = sp; /* fill in default parameters */ sp->pump_arg = NULL; sp->num_threads = get_logical_processor_count(); /* default thread count */ sp->num_in_bufs = 3 * sp->num_threads; sp->num_tasks = 3 * sp->num_threads; sp->in_buf_size = (1 << 18); sp->num_outputs = 1; sp->out = (struct sump_out *)calloc(1, sizeof(struct sump_out)); sp->out[0].buf_size = (1 << 18); sp->delimiter = (void *)"\n"; sp->rec_size = 0; sp->pump_func = pump_func; if ((p = getenv("SUMP_PUMP")) == NULL) { /* allocate minimal string */ args = (char *)calloc(1, 1); args[0] = '\0'; } else { args = (char *)calloc(strlen(p) + 2, 1); memcpy(args, p, strlen(p)); /* add newline to separate potential following commands */ args[strlen(p)] = '\n'; args[strlen(p) + 1] = '\0'; } if (arg_fmt != NULL) { va_list ap; size_t args_size; p = args; va_start(ap, arg_fmt); #if defined(win_nt) args_size = _vscprintf(arg_fmt, ap); #else args_size = vsnprintf(NULL, 0, arg_fmt, ap); #endif va_end(ap); args = (char *)calloc(strlen(p) + args_size + 1, 1); memcpy(args, p, strlen(p)); va_start(ap, arg_fmt); if (vsnprintf(args + strlen(p), args_size + 1, arg_fmt, ap) != args_size) { start_error(sp, "sp_start: " "vnsprintf failed to return %d\n", args_size); return (sp->error_code); } va_end(ap); free(p); /* free copy of environment string */ TRACE("sp_start args: '%s'\n", args); } for (p = args; ; ) { /* ignore leading white space chars */ while (isspace(*(unsigned char *)p)) p++; if (*p == '\0') break; if (*p++ != '-') { /* Must be the name of an external program, and possibly * some command line arguments for it. */ p--; sp->flags |= SP_EXEC; #if defined(SUMP_PIPE_STDERR) if (sp->num_outputs == 1) { sp->out = (struct sump_out *) realloc(sp->out, 2 * sizeof(struct sump_out)); if (sp->out == NULL) { start_error(sp, "set_num_outputs, realloc() failed\n"); return (sp->error_code); } memset(sp->out + 1, 0, sizeof(struct sump_out)); sp->out[1].buf_size = sp->out[0].buf_size; sp->num_outputs = 2; } #endif get_exec_args(sp, &p); if (sp->error_code) return (sp->error_code); } else if (scan("ASCII", &p) || scan("UTF_8", &p)) { sp->flags |= SP_UTF_8; } else if (scan("DEFAULT_FILE_MODE=", &p)) { if (scan("BUFFERED", &p) || scan("BUF", &p)) Default_file_mode = MODE_BUFFERED; else if (scan("DIRECT", &p) || scan("DIR", &p)) Default_file_mode = MODE_DIRECT; else syntax_error(sp, p, "unrecognized file access mode"); } else if (scan("GROUP_BY", &p) || scan("GROUP", &p)) sp->flags |= SP_GROUP_BY; else if (scan("IN_BUFS=", &p)) sp->num_in_bufs = (unsigned)get_numeric_arg(sp, &p); else if (scan("IN_BUF_SIZE=", &p)) { sp->in_buf_size = (ssize_t)get_numeric_arg(sp, &p); sp->in_buf_size *= (ssize_t)get_scale(&p); } else if (scan("IN_FILE=", &p) || scan("IN=", &p)) { /* get input file here */ sp->in_file = get_string_arg(&p); sp->in_file_alloc = TRUE; } else if (scan("OUT_BUF_SIZE", &p)) { size_t size; double incr; if (*p == '=') /* if no index in square brackets */ { index = 0; /* default to index 0 */ p++; } else index = get_output_index(sp, &p); if (*p == '.') size = 0; else size = (int)get_numeric_arg(sp, &p); if (*p == '.' || *p == 'x' || *p == 'X') { sp->out[index].buf_size_mult = (double)size; if (*p == '.') { p++; incr = 0.1; while (*p >= '0' && *p <= '9') { sp->out[index].buf_size_mult += (*p - '0') * incr; incr *= 0.1; p++; } } if (*p != 'x' && *p != 'X') { start_error(sp, "sp_start: " "out buf size factor must end with 'x'\n"); return (sp->error_code); } p++; sp->out[index].size_specified = FALSE; } else { sp->out[index].buf_size = size; sp->out[index].buf_size *= (size_t)get_scale(&p); sp->out[index].size_specified = TRUE; } } else if (scan("OUT_FILE", &p) || scan("OUT", &p)) { if (*p == '=') /* if no index in square brackets */ { index = 0; /* default to index 0 */ p++; } else index = get_output_index(sp, &p); if (index >= sp->num_outputs) { start_error(sp, "sp_start: " "output file index must be less than the " "number of output files\n"); return (sp->error_code); } sp->out[index].file = get_string_arg(&p); sp->out[index].file_alloc = TRUE; } else if (scan("OUTPUTS=", &p)) { unsigned num_outputs; num_outputs = (unsigned)get_numeric_arg(sp, &p); if (num_outputs > sp->num_outputs) { sp->out = (struct sump_out *)realloc(sp->out, num_outputs * sizeof(struct sump_out)); if (sp->out == NULL) { start_error(sp, "set_num_outputs, realloc() failed\n"); return (sp->error_code); } for (i = sp->num_outputs; i < num_outputs; i++) { memset(sp->out + i, 0, sizeof(struct sump_out)); sp->out[i].buf_size = sp->out[0].buf_size; } } sp->num_outputs = num_outputs; } else if (scan("WHOLE_BUF", &p) || scan("WHOLE", &p)) { sp->flags &= ~SP_UTF_8; sp->flags |= SP_WHOLE_BUF; } else if (scan("REC_SIZE=", &p)) { sp->rec_size = (int)get_numeric_arg(sp, &p); sp->flags &= ~SP_UTF_8; sp->flags |= SP_FIXED; if (sp->rec_size <= 0) { start_error(sp, "sp_start: " "REC_SIZE must be greater than 0\n"); return (sp->error_code); } } else if (scan("RW_TEST_SIZE=", &p)) { Default_rw_test_size = (size_t)get_numeric_arg(sp, &p); Default_rw_test_size *= (size_t)get_scale(&p); } else if (scan("TASKS=", &p)) { sp->num_in_bufs = sp->num_tasks = (unsigned)get_numeric_arg(sp, &p); } else if (scan("THREADS=", &p)) { int num_threads; num_threads = (int)get_numeric_arg(sp, &p); if (num_threads > 0) sp->num_threads = (unsigned)num_threads; } else syntax_error(sp, p, "unrecognized keyword"); if (sp->error_code) return (sp->error_code); } free(args); if (REC_TYPE(sp) == 0) { start_error(sp, "sp_start: a record type must be specified\n"); return (sp->error_code); } if (REC_TYPE(sp) == SP_UTF_8) { if (strlen((char *)sp->delimiter) > 1) { start_error(sp, "sp_start: " "currently only single-char delimiters are allowed\n"); return (sp->error_code); } } else if (REC_TYPE(sp) == SP_UNICODE) { start_error(sp, "sp_start: unicode records are currently not supported\n"); return (sp->error_code); } else if (REC_TYPE(sp) == SP_FIXED) { /* nothing for now */ } else if (REC_TYPE(sp) == SP_WHOLE_BUF) { /* nothing for now */ } else { start_error(sp, "sp_start: multiple record types specified\n"); return (sp->error_code); } for (i = 0; i < sp->num_outputs; i++) { /* if an output buffer absolute size has not been specified */ if (sp->out[i].size_specified == FALSE) { /* if an output buffer size multiplier was specified, use it */ if (sp->out[i].buf_size_mult != 0.0) sp->out[i].buf_size = (size_t)(sp->in_buf_size * sp->out[i].buf_size_mult + 0.5); else /* default to using 2x the input buffer size */ sp->out[i].buf_size = 2 * sp->in_buf_size; } TRACE("out %d: %d\n", i, (int)sp->out[i].buf_size); } /* alloc rec output buffers for each task */ sp->task = (sp_task_t)calloc(sp->num_tasks, sizeof(struct sp_task)); for (i = 0; i < sp->num_tasks; i++) { sp->task[i].out = (struct task_out *) calloc(sp->num_outputs, sizeof (struct task_out)); sp->task[i].error_buf_size = ERROR_BUF_SIZE; sp->task[i].error_buf = (char *)calloc(1, sp->task[i].error_buf_size); if (sp->task[i].error_buf == NULL) return (SP_MEM_ALLOC_ERROR); for (j = 0; j < sp->num_outputs; j++) { sp->task[i].out[j].buf = (void *)malloc(sp->out[j].buf_size); sp->task[i].out[j].size = sp->out[j].buf_size; } sp->task[i].sp = sp; } /* alloc input buffers */ sp->in_buf = (in_buf_t *)calloc(sp->num_in_bufs, sizeof(in_buf_t)); for (i = 0; i < sp->num_in_bufs; i++) { size_t buf_size; /* round up buf size to page size multiple */ buf_size = ((sp->in_buf_size + PAGE_SIZE - 1) / PAGE_SIZE) * PAGE_SIZE; #if defined(win_nt) sp->in_buf[i].in_buf = VirtualAlloc(NULL, buf_size, MEM_COMMIT, PAGE_READWRITE); if (sp->in_buf[i].in_buf == NULL) return (SP_MEM_ALLOC_ERROR); #else init_zero_fd(); sp->in_buf[i].in_buf = mmap(NULL, buf_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, Zero_fd, 0); if (sp->in_buf[i].in_buf == MAP_FAILED) return (SP_MEM_ALLOC_ERROR); #endif sp->in_buf[i].in_buf_size = sp->in_buf_size; sp->in_buf[i].alloc_size = buf_size; } if (sp->flags & SP_EXEC) { if (sp->pump_func != NULL) { start_error(sp, "can't both define a pump function and external program\n"); return (sp->error_code); } sp->ex_state = (struct exec_state *) calloc(sizeof(struct exec_state), sp->num_threads); /* use own internal pump function to pipe to/from external process */ sp->pump_func = pfunc_exec; #if !defined(win_nt) /* ignore broken pipe signal, failed write()'s return with error */ signal(SIGPIPE, SIG_IGN); #endif } else if (sp->pump_func == NULL) { start_error(sp, "an external program or pump function needs to be defined\n"); return (sp->error_code); } /* create mutexes and conditions */ pthread_mutex_init(&sp->sump_mtx, NULL); pthread_mutex_init(&sp->sp_mtx, NULL); pthread_cond_init(&sp->in_buf_readable_cond, NULL); pthread_cond_init(&sp->in_buf_done_cond, NULL); pthread_cond_init(&sp->task_avail_cond, NULL); pthread_cond_init(&sp->task_drained_cond, NULL); pthread_cond_init(&sp->task_output_ready_cond, NULL); pthread_cond_init(&sp->task_output_empty_cond, NULL); /* create thread sump threads */ sp->thread = (pthread_t *)calloc(sp->num_threads, sizeof(pthread_t)); for (i = 0; i < sp->num_threads; i++) { ret = pthread_create(&sp->thread[i], NULL, pump_thread_main, (void *)sp); if (ret) die("pthread_create() failed: %d\n", ret); if (sp->flags & SP_EXEC) { sp->ex_state[i].in.ex = &sp->ex_state[i]; sp->ex_state[i].out.ex = &sp->ex_state[i]; #if defined(win_nt) sp->ex_state[i].in.rd_h = sp->ex_state[i].in.wr_h = INVALID_HANDLE_VALUE; sp->ex_state[i].out.rd_h = sp->ex_state[i].out.wr_h = INVALID_HANDLE_VALUE; #else sp->ex_state[i].in.rd_fd = sp->ex_state[i].in.wr_fd = INVALID_FD; sp->ex_state[i].out.rd_fd = sp->ex_state[i].out.wr_fd = INVALID_FD; #endif #if defined(SUMP_PIPE_STDERR) sp->ex_state[i].err.ex = &sp->ex_state[i]; sp->ex_state[i].err.rd_fd = sp->ex_state[i].err.wr_fd = INVALID_FD; #endif } } for (i = 0; i < sp->num_outputs; i++) { if (sp->out[i].file != NULL) { /* start an output file connection */ sp->out[i].file_sp = sp_open_file_dst(sp, i, sp->out[i].file); if (sp->out[i].file_sp == NULL) { start_error(sp, "%s: %s\n", sp->out[i].file, get_error_msg(0, err_buf, sizeof(err_buf))); return (SP_FILE_OPEN_ERROR); } } } if (sp->in_file != NULL) { /* start an input file connection */ sp->in_file_sp = sp_open_file_src(sp, sp->in_file); if (sp->in_file_sp == NULL) { start_error(sp, "%s: %s\n", sp->in_file, get_error_msg(0, err_buf, sizeof(err_buf))); return (SP_FILE_OPEN_ERROR); } } return (SP_OK); } /* sp_get_error_string - return an error message string * * Returns: a string containing a sump pump error message. The string should * NOT be free()'d and is valid until the passed sp_t is sp_free()'d. */ const char *sp_get_error_string(sp_t sp, int error_code) { const char *err_code_str; if (sp != NULL && sp->error_buf[0] != '\0') return (sp->error_buf); if (error_code == 0 && sp != NULL) error_code = sp->error_code; switch (error_code) { case SP_FILE_READ_ERROR: err_code_str = "SP_FILE_READ_ERROR: file read error"; break; case SP_FILE_WRITE_ERROR: err_code_str = "SP_FILE_WRITE_ERROR: file write error"; break; case SP_UPSTREAM_ERROR: err_code_str = "SP_UPSTREAM_ERROR: upstream error"; break; case SP_REDUNDANT_EOF: err_code_str = "SP_REDUNDANT_EOF: redundant eof"; break; case SP_MEM_ALLOC_ERROR: err_code_str = "SP_MEM_ALLOC_ERROR: memory allocation error"; break; case SP_FILE_OPEN_ERROR: err_code_str = "SP_FILE_OPEN_ERROR: file open error"; break; case SP_WRITE_ERROR: err_code_str = "SP_WRITE_ERROR: write error"; break; case SP_SORT_DEF_ERROR: err_code_str = "SP_SORT_DEF_ERROR: sort definition error"; break; case SP_SORT_EXEC_ERROR: err_code_str = "SP_SORT_EXEC_ERROR: sort execution error"; break; case SP_GROUP_BY_MISMATCH: err_code_str = "SP_GROUP_BY_MISMATCH: group-by mismatch"; break; case SP_SORT_INCOMPATIBLE: err_code_str = "SP_SORT_INCOMPATIBLE: a sort sump pump is incompatible" " with buffer-at-a-time i/o, specify the input or" " output file as a sort parameter instead"; break; case SP_BUF_INDEX_ERROR: err_code_str = "SP_BUF_INDEX_ERROR: the specified buffer index is" " out-of-range"; break; case SP_OUTPUT_INDEX_ERROR: err_code_str = "SP_OUTPUT_INDEX_ERROR: the specified output index is" " equal to or larger than the number of outputs"; break; case SP_START_ERROR: err_code_str = "SP_START_ERROR: an error occured during sp_start()"; break; case SP_NSORT_LINK_FAILURE: err_code_str = "SP_NSORT_LINK_FAILURE: link attempt to nsort library failed. " "Is nsort installed?"; break; case SP_SORT_NOT_COMPILED: err_code_str = "SP_SORT_NOT_COMPILED: sump.c has not been compiled to support " "sorting"; break; case SP_PUMP_FUNCTION_ERROR: err_code_str = "Pump function error"; break; default: err_code_str = "Unknown Error"; } return (err_code_str); } /* sp_free - free the specified sp_t and its associated state. */ void sp_free(sp_t *caller_sp) { sp_t sp; int i, j; sp = *caller_sp; if (sp == NULL) return; sp_wait(sp); /* make sure sump pump has finished */ #if !defined(SUMP_PUMP_NO_SORT) if (sp->flags & SP_SORT) { if (sp->sort_temp_buf != NULL) free(sp->sort_temp_buf); if (sp->nsort_ctx != 0) (*Nsort_end)(&sp->nsort_ctx); if (sp->out != NULL) free(sp->out); } else #endif { if (sp->task != NULL) { for (i = 0; i < sp->num_tasks; i++) { if (sp->task[i].out != NULL) { for (j = 0; j < sp->num_outputs; j++) if (sp->task[i].out[j].buf != NULL) free(sp->task[i].out[j].buf); free(sp->task[i].out); } if (sp->task[i].error_buf != NULL) free(sp->task[i].error_buf); } free(sp->task); } if (sp->in_buf != NULL) { for (i = 0; i < sp->num_in_bufs; i++) { if (sp->in_buf[i].in_buf != NULL) { #if defined(win_nt) VirtualFree(sp->in_buf[i].in_buf, sp->in_buf[i].alloc_size, MEM_RELEASE); #else munmap(sp->in_buf[i].in_buf, sp->in_buf[i].alloc_size); #endif } } free(sp->in_buf); } if (sp->ex_state != NULL) free(sp->ex_state); if (sp->thread != NULL) { pthread_mutex_destroy(&sp->sump_mtx); pthread_mutex_destroy(&sp->sp_mtx); pthread_cond_destroy(&sp->in_buf_readable_cond); pthread_cond_destroy(&sp->in_buf_done_cond); pthread_cond_destroy(&sp->task_avail_cond); pthread_cond_destroy(&sp->task_drained_cond); pthread_cond_destroy(&sp->task_output_ready_cond); pthread_cond_destroy(&sp->task_output_empty_cond); free(sp->thread); } if (sp->out != NULL) { for (i = 0; i < sp->num_outputs; i++) { if (sp->out[i].file_sp != NULL) sp_file_free(&sp->out[i].file_sp); if (sp->out[i].file_alloc && sp->out[i].file != NULL) free(sp->out[i].file); } free(sp->out); } if (sp->in_file_sp != NULL) sp_file_free(&sp->in_file_sp); if (sp->in_file_alloc && sp->in_file != NULL) free(sp->in_file); } if (sp->error_buf != NULL) free(sp->error_buf); free(sp); *caller_sp = NULL; } /* sp_file_free - free the specified sp_file_t and its associated state. */ void sp_file_free(sp_file_t *caller_sp_file) { sp_file_t sp_file; sp_file = *caller_sp_file; if (sp_file == NULL) return; if (sp_file->fname != NULL) free(sp_file->fname); free(sp_file); *caller_sp_file = NULL; }