/* sump.h - SUMP Pump(TM) SMP/CMP parallel data pump library * SUMP Pump is a trademark of Ordinal Technology Corp * * SUMP (Scale Up MultiProcessor) Pump is an executable program and * subroutine library that allows programmers to quickly take advantage * of multiple processors in a computer system. The sump pump program, * sump, can run multiple instances of a separate program to parallelize * its processing. The sump pump library allows programmers to write * subroutines that are automatically parallelized by the sump pump * infrastructure. * * $Revision: 122 $ * * Copyright (C) 2010 - 2011, Ordinal Technology Corp, http://www.ordinal.com * * This program is free software; you can redistribute it and/or * modify it under the terms of Version 2 of the GNU General Public * License as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * Linking SUMP Pump statically or dynamically with other modules is * making a combined work based on SUMP Pump. Thus, the terms and * conditions of the GNU General Public License v.2 cover the whole * combination. * * In addition, as a special exception, the copyright holders of SUMP Pump * give you permission to combine SUMP Pump program with free software * programs or libraries that are released under the GNU LGPL and with * independent modules that communicate with SUMP Pump solely through * Ordinal Technology Corp's Nsort Subroutine Library interface as defined * in the Nsort User Guide, http://www.ordinal.com/NsortUserGuide.pdf. * You may copy and distribute such a system following the terms of the * GNU GPL for SUMP Pump and the licenses of the other code concerned, * provided that you include the source code of that other code when and * as the GNU GPL requires distribution of source code. * * Note that people who make modified versions of SUMP Pump are not * obligated to grant this special exception for their modified * versions; it is their choice whether to do so. The GNU General * Public License gives permission to release a modified version without * this exception; this exception also makes it possible to release a * modified version which carries forward this exception. * * For more information on SUMP Pump, see: * http://www.ordinal.com/sump.html * http://code.google.com/p/sump-pump/ */ #ifndef _SUMP_PUMP_H_ #define _SUMP_PUMP_H_ #include #include #if defined(_WIN32) # define win_nt 1 typedef __int64 int64_t; typedef unsigned __int64 uint64_t; # include "sump_win.h" #else /* now non-Windows */ # include #endif /* sump pump type */ typedef struct sump *sp_t; /* sump pump param type */ typedef struct sp_param *sp_param_t; /* sump pump task type that is passed to pump functions */ typedef struct sp_task *sp_task_t; /* sump pump file type */ typedef struct sp_file *sp_file_t; /* sump pump link type */ typedef struct sp_link *sp_link_t; /* pump function type. this is the function that is executed * in parallel by multiple sump pump member threads */ typedef int (*sp_pump_t)(sp_task_t t, void *arg); /* SUMP Pump Library * ----------------- * * The sump pump library facilitates the automatic parallelization of * either (1) a programmer-defined function that reads input records and * writes output records (a pump function or pump func), or (2) an * external program that reads its standard input and writes to its * standard output. * * The sump pump API consists of functions that can be called by the * host program, and the functions that can be called by the pump * functions. */ /* Library Functions for Host Program * ---------------------------------- */ /* sp_get_version - get the subversion version for sump pump * * Returns: a string containing the subversion version. The string should * NOT be free()'d. */ const char *sp_get_version(void); /* sp_get_id - get the subversion id keyword substitution for sump pump * * Returns: a string containing the subversion id keyword. The string should * NOT be free()'d. */ const char *sp_get_id(void); /* sp_start - Start a sump pump * * Parameters: * sp - Pointer to where to return newly allocated sp_t * identifier that will be used in as the first argument * to all subsequent sp_*() calls. * pump_func - Pointer to pump function that will be called by * multiple sump pump threads at once. If an external * program name is specified in the arg_fmt string, this * parameter can and must be NULL. * arg_fmt - Printf-format-like string that can be used to specify * the following sump pump directives: * -ASCII or -UTF_8 Input records are ascii/utf-8 * characters delimited by a newline * character. * -GROUP_BY or -GROUP Group input records for the purpose * of reducing them. The sump pump input * should be coming from an nsort * instance where the "-match" * directive has been declared. This * directive prevents records with * equal keys from being dispersed to * more than one sump pump task. * -IN=%s or -IN_FILE=%s Input file name for the sump pump * input. if not specified, the input * should be written into the sump pump * either by calls to sp_write_input() * or sp_start_link() * The input file name can be followed * by options that control the file * access mode and transfer size. * See sp_open_file_src() comments. * -IN_BUF_SIZE=%d{k,m,g} Overrides default input buffer * size (256kb). If a 'k', 'm' or 'g' * suffix is specified, the specified * size is multiplied by 2^10, 2^20 or * 2^30 respectively. * -IN_BUFS=%d Overrides default number of input * buffers (the number of tasks). * -OUT[%d]=%s or -OUT_FILE[%d]=%s The output file name for * the specified output index, or output * 0 if no index is specified. If not * defined, the output should be read * either by calls to sp_read_output() * or by sp_start_link(). * The output file name can be followed * by options that control the file * access mode and transfer size. * See sp_open_file_dst() comments. * -OUT_BUF_SIZE[%d]=%d{x,k,m,g} Overrides default output * buffer size (2x the input buf size) * for the specified output index, or * output 0 if no index is specified. * If the size ends with a suffix of * 'x', the size is used as a multiplier * of the input buffer size. If a 'k', * 'm' or 'g' suffix is specified, the * specified size is multiplied by 2^10, * 2^20 or 2^30 respectively. It is not * an error if the output of a task * exceeds the output buffer size, but * it can potentially result in loss * of parallelism. * -OUTPUTS=%d Overrides default number of output * streams (1). * -REC_SIZE=%d Defines the input record size in * bytes. The record contents need not * be ascii nor delimited by a newline * character. If not specified, records * must consist of ascii or utf-8 * characters and be terminated by a * newline. * -TASKS=%d Overrides default number of output * tasks (3x the number of threads). * -THREADS=%d Overrides default number of threads * that are used to execute the pump * function in parallel. The default is * the number of logical processors in * the system. * -WHOLE or Processing is not done by input * -WHOLE_BUF records so not input record type * should be defined. Instead, * processing is done by whole input * buffers. * -DEFAULT_FILE_MODE={BUFFERED,BUF,DIRECT,DIR} Set the * default file access mode for both * input and output files. If none is * specified, the direct mode is used * to access input and output files for * which a BUFFERED file modifier is * not specified. * [external_program_name external_program_arguments] * The name of an external program and * its arguments. The program name * cannot start with the '-' character. * The external program name and its * arguments must be last in the * arg_fmt string. If the program name * does not include a path, the PATH * environment variable will be used * to find it. * ... potential subsequent arguments to arg_fmt * * Returns: SP_OK or a sump pump error code * * The sump pump directives can also appear in a SUMP_PUMP environment * variable. */ int sp_start(sp_t *sp, sp_pump_t pump_func, char *arg_fmt, ...); /* sp_argv_to_str - bundle up the specified argv and return it as a string * for instance if argc is 2, argv[0] is "TASKS=2" and * argv[1] is "THEADS=3", then return the string * "TASKS=2 THREADS=3". * * Returns: a string containing the command line arguments passed as function * arguments. The string should be free()'d when no longer needed. */ char *sp_argv_to_str(char *argv[], int argc); /* sp_start_sort - start an nsort instance with a sump pump outside * wrapper so that its input or output can be assigned to * a file or linked to another sump pump. For instance, * the sort input or output can be linked to sump pumps * performing record pumping such as "map" on input * and "reduce" for output. * Parameters: * sp - Pointer to where to return newly allocated sp_t * identifier that will be used in as the first * argument to all subsequent sp_*() calls. * def - Nsort sort definition string. Besides the Nsort * commands listed in the Nsort User Guide, the following * directives are also recognized: * -match[=%d] Each output record will be preceded by a * single byte that indicates the * specified number of keys in this record * are the same as in the previous record. * If no key number is specified, all keys * are examined for a match condition. * * Returns: SP_OK or a sump pump error code */ int sp_start_sort(sp_t *sp, char *def_fmt, ...); /* sp_get_sort_stats - get a string containing the nsort statistics report * for an nsort sump pump that has completed. * * Returns: a string containing the Nsort statistics report. The string should * NOT be free()'d and is valid until the passed sp_t is sp_free()'d. */ const char *sp_get_sort_stats(sp_t sp); /* sp_get_nsort_version - get the subversion version for sump pump * * Returns: a string containing the Nsort version number. The string should * NOT be free()'d. */ const char *sp_get_nsort_version(void); /* sp_link - start a link/connection between an output * of a sump pump to the input of another sump pump. * * Returns: SP_OK or a sump pump error code */ int sp_link(sp_t out_sp, unsigned out_index, sp_t in_sp); /* sp_write_input - write data that is the input to a sump pump. * A write size of 0 indicates input EOF. * * Returns: the amount of data written. If this is not the same as the * requested write size, an error has occurred. */ ssize_t sp_write_input(sp_t sp, void *buf, ssize_t size); /* sp_get_in_buf - get a pointer to an input buffer that an external * thread can fill with input data. * * Returns: SP_OK or a sump pump error code */ int sp_get_in_buf(sp_t sp, uint64_t index, void **buf, size_t *size); /* sp_put_in_buf_bytes - flush bytes that have been placed in a sump * pump's input buffer by an external thread. * This function should only be used by first * calling sp_get_in_buf(). * * Returns: SP_OK or a sump pump error code */ int sp_put_in_buf_bytes(sp_t sp, uint64_t index, size_t size, int eof); /* sp_read_output - read bytes from a specified output of the specified * sump pump. * * Returns: The number of bytes read. If 0, then EOF has occurred. * If negative, an error has occurred. */ ssize_t sp_read_output(sp_t sp, unsigned index, void *buf, ssize_t size); /* sp_get_error - get the error code of a sump pump. * * Returns: SP_OK if no error has occurred, otherwise the error code. */ int sp_get_error(sp_t sp); /* sp_wait - can be called by an external thread, e.g. the thread that * called sp_start(), to wait for all sump pump activity to cease. * * Returns: SP_OK or a sump pump error code */ int sp_wait(sp_t sp); /* sp_open_file_src - use the specified file as the input for the * specified sump pump. * * Parameters: * sp - sp_t identifier for which we are opening the input file. * fname_mods - Name of the file, potentially followed by one or more * of the following modifiers (with no intervening spaces): * ,BUFFERED or ,BUF The file will be read with normal * buffered (not direct) reads. * ,DIRECT or ,DIR The file will be read with direct * and asynchronous reads. * ,TRANSFER=%d{k,m,g} or ,TRANS=%d{k,m,g} or ,TR=%d{k,m,g} * The transfer size (read request size) * is specified in kilo, mega or giga * bytes. * ,COUNT=%d or ,CO=%d The count of the maximum number of * outstanding asynchronous read requests * is given. * Example: * myfilename,dir,trans=4m,co=4 * The above example specifies a file * name of "myfilename", with direct * and asynchronous reads, with a * request size of 4 MB, and a maximum * of 4 outstanding asynchronous read * requests at any time. * * Returns: NULL if an error occurs in opening the file, otherwise * a valid sump pump file structure. */ sp_file_t sp_open_file_src(sp_t sp, const char *fname_mods); /* sp_open_file_dst - use the specified file as the output for the * specified output of the specified sump pump. * * Parameters: * sp - sp_t identifier for which we are opening an output file. * out_index - Integer indicating the sump pump output index. * fname_mods - Name of the file, potentially followed by one or more * of the following modifiers (with no intervening spaces): * ,BUFFERED or ,BUF The file will be written with normal * buffered (not direct) writes. * ,DIRECT or ,DIR The file will be written with direct * and asynchronous writes. * ,TRANSFER=%d{k,m,g} or ,TRANS=%d{k,m,g} or ,TR=%d{k,m,g} * The transfer size (write request size) * is specified in kilo, mega or giga * bytes. * ,COUNT=%d or ,CO=%d The count of the maximum number of * outstanding asynchronous write requests * is given. * Example: * myfilename,dir,trans=4m,co=4 * The above example specifies a file * name of "myfilename", with direct * and asynchronous writes, with a * request size of 4 MB, and a maximum * of 4 outstanding asynchronous write * requests at any time. * * Returns: NULL if an error occurs in opening the file, otherwise * a valid sump pump file structure. */ sp_file_t sp_open_file_dst(sp_t sp, unsigned out_index, const char *fname_mods); /* sp_file_wait - wait for the specified file connection to complete. * * Returns: SP_OK or a sump pump error code */ int sp_file_wait(sp_file_t sp_file); /* sp_get_error_string - return an error message string * * Returns: a string containing a sump pump error message. The string should * NOT be free()'d and is valid until the passed sp_t is sp_free()'d. */ const char *sp_get_error_string(sp_t sp, int error_code); /* sp_free - free the specified sp_t and its associated state. */ void sp_free(sp_t *caller_sp); /* sp_file_free - free the specified sp_file_t and its associated state. */ void sp_file_free(sp_file_t *caller_sp_file); /* Library Functions for Pump Functions * ------------------------------------ * * The API calls specific to pump functions all take as an argument the * sp_task_t data type that is passed to the pump function by the sump * pump infrastructure. * * "Make my funk the P.Funk, I wants to get funked up." - George Clinton */ /* pfunc_get_rec - get a pointer to the next input record for a pump function. * The sump pump infrastructure allocates the record buffers * and modifies the pointer-to-a-pointer argument to point to * the buffer. If the record type is text, a null character * will terminate the record. * * Returns: 0 if no more records are in the sump pump task input, otherwise * the number bytes in the record not including the terminating null * character. */ size_t pfunc_get_rec(sp_task_t t, void *ptr_to_rec_ptr); /* pfunc_get_in_buf - get a pointer to the input buffer for a pump function. * * Returns: SP_OK or a sump pump error code */ int pfunc_get_in_buf(sp_task_t t, void **buf, size_t *size); /* pfunc_get_out_buf - get a pointer to an output buffer and its size for * a pump function. * * Returns: SP_OK or a sump pump error code */ int pfunc_get_out_buf(sp_task_t t, unsigned out_index, void **buf, size_t *size); /* pfunc_put_out_buf_bytes - flush bytes that have been placed in a pump * function's output buffer. This routine can only * be used by first calling pf_get_out_buf(). * * Returns: SP_OK or a sump pump error code */ int pfunc_put_out_buf_bytes(sp_task_t t, unsigned out_index, size_t size); /* pfunc_get_thread_index - can be used by pump functions to get the * index of the sump pump thread executing the * pump func. For instance, if there are 4 * sump pump threads, this function will return * 0-3 depending on which of the 4 threads is * invoking it. * * Returns: The index of the requesting sump pump thread. */ int pfunc_get_thread_index(sp_task_t t); /* pfunc_get_task_number - can be used by pump functions to get the sump * pump task number being executed by the pump * function. This number starts at 0 and * increases with each subsequent task * issued/started by the sump pump. * * Returns: The task number (starting with 0 as the first task) that the * calling thread is currently executing. */ uint64_t pfunc_get_task_number(sp_task_t t); /* pfunc_write - write function that can be used by a pump function to * write the output data for the pump function. * * Returns: the number of bytes written. If this is not the same as the * requested size, an error has occurred. */ size_t pfunc_write(sp_task_t t, unsigned out_index, void *buf, size_t size); /* pfunc_printf - print a formatted string to a pump functions's output. * * Returns: the number of bytes written. If this is not the same as the * requested size, an error has occurred. */ int pfunc_printf(sp_task_t t, unsigned out_index, const char *fmt, ...); /* pfunc_error - raise an error for a pump function and define an error * string that can be retrived by other threads using * sp_get_error_string(). * * Returns: SP_PUMP_FUNCTION_ERROR */ int pfunc_error(sp_task_t t, const char *fmt, ...); /* pfunc_mutex_lock - lock the auto-allocated mutex for pump functions. */ void pfunc_mutex_lock(sp_task_t t); /* pfunc_mutex_unlock - unlock the auto-allocated mutex for pump functions. */ void pfunc_mutex_unlock(sp_task_t t); /* Sump Pump Library Error Codes * ----------------------------- * * Many sump pump API call return an integer indicates one the following * success or error conditions: */ /* SP_OK - no error occurred, everything OK */ #define SP_OK 0 /* SP_FILE_READ_ERROR - a file read error occurred */ #define SP_FILE_READ_ERROR (-1) /* SP_FILE_WRITE_ERROR - a file write error occurred */ #define SP_FILE_WRITE_ERROR (-2) /* SP_UPSTREAM_ERROR - upstream error occurred (equivalent of broken pipe) */ #define SP_UPSTREAM_ERROR (-3) /* SP_REDUNDANT_EOF - an eof declaration (zero-sized sp_write_input()) has * already been made */ #define SP_REDUNDANT_EOF (-4) /* SP_MEM_ALLOC_ERROR - a memory allocation failure occurred */ #define SP_MEM_ALLOC_ERROR (-5) /* SP_FILE_OPEN_ERROR - a file open() error occurred */ #define SP_FILE_OPEN_ERROR (-6) /* SP_WRITE_ERROR - a file write error occurred */ #define SP_WRITE_ERROR (-7) /* SP_SORT_DEF_ERROR - an error occurred in the internal call to * nsort_define(). Check the nsort definition * string. */ #define SP_SORT_DEF_ERROR (-8) /* SP_SORT_EXEC_ERROR - an error occurred during sort execution. */ #define SP_SORT_EXEC_ERROR (-9) /* SP_GROUP_BY_MISMATCH - a call to sp_link() linked a sump pump output * where the "-match" directive for the source * sort sump pump did not match the GROUP_BY * directive for the linked to sump pump. * Either neither should be specified or both, * not just one. */ #define SP_GROUP_BY_MISMATCH (-10) /* SP_SORT_INCOMPATIBLE - sump pump read/write function incompatible with a sort sump pump */ #define SP_SORT_INCOMPATIBLE (-11) /* SP_BUF_INDEX_ERROR - sump pump buffer index passed to a read/write function is invalid. */ #define SP_BUF_INDEX_ERROR (-12) /* SP_OUTPUT_INDEX_ERROR - sump pump output channel index passed to a read/write function is invalid. */ #define SP_OUTPUT_INDEX_ERROR (-13) /* SP_START_ERROR - sp_start() error */ #define SP_START_ERROR (-14) /* SP_SYNTAX_ERROR - error in sump pump parameter syntax */ #define SP_SYNTAX_ERROR (-15) /* SP_NSORT_LINK_FAILURE - link attempt to nsort library failed */ #define SP_NSORT_LINK_FAILURE (-16) /* SP_SORT_NOT_COMPILED - sump pump was compiled without sort support */ #define SP_SORT_NOT_COMPILED (-17) #define SP_PUMP_FUNCTION_ERROR (-1000) #endif /* !_SUMP_PUMP_H_ */