/* valsort.c - Sort output data validator * * $Date: 2013/03/15 04:43:14 $ * Chris Nyberg <chris.nyberg@ordinal.com> * * Copyright (C) 2009 - 2013 * * This program is free software; you can redistribute it and/or * modify it under the terms of Version 2 of the GNU General Public * License as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ char *Version = "1.5"; #include <stdio.h> #include <stdlib.h> #include <string.h> #include <zlib.h> /* use crc32() function */ #include "rand16.h" #if defined(SUMP_PUMP) # include <fcntl.h> # include "sump.h" #endif /* end defined(SUMP_PUMP) */ #define REC_SIZE 100 #define SUM_SIZE (sizeof(struct summary)) #if defined(_WIN32) # define strcasecmp _stricmp #endif /* Comparison routine, either memcmp() or strcasecmp() */ int (*Compare)(const unsigned char *a, const unsigned char *b, size_t n) = (int (*)(const unsigned char *a, const unsigned char *b, size_t n))memcmp; int Read_summary; /* Read a file of partition summaries, not records */ int Quiet; /* Quiet mode, don't output diagnostic information */ int Multithreaded; /* boolean indicating if a sump pump is being used */ /* struct used to summarize a partition of sort output */ struct summary { u16 first_unordered; /* index of first unordered record, * or 0 if no unordered records */ u16 unordered_count; /* total number of unordered records*/ u16 rec_count; /* total number of records */ u16 dup_count; /* total number of duplicate keys */ u16 checksum; /* checksum of all records */ unsigned char first_rec[REC_SIZE]; /* first record */ unsigned char last_rec[REC_SIZE]; /* last record */ }; struct summary Summary; /* next_rec - get the next record to be validated * */ unsigned char *next_rec(void *in, unsigned char *rec_buf, struct summary *sum) { int read_size; unsigned char *rec = NULL; u16 temp16 = {0LL, 0LL}; #if defined(SUMP_PUMP) if (Multithreaded) { /* get the record from the sump pump infrastructure */ read_size = (int)pfunc_get_rec(in, &rec); } else #endif { read_size = fread(rec_buf, 1, REC_SIZE, in); rec = rec_buf; } if (read_size == REC_SIZE) { temp16.lo8 = crc32(0, rec, REC_SIZE); sum->checksum = add16(sum->checksum, temp16); } else if (read_size == 0) return (NULL); else if (read_size < 0) { fprintf(stderr, "record read error\n"); exit(1); } else { fprintf(stderr, "partial record found at end\n"); exit(1); } return (rec); } /* summarize_records - summarize the validity of a sequence of records. */ int summarize_records(void *in, void *unused) { struct summary *sum; int diff; u16 one = {0LL, 1LL}; unsigned char *rec; unsigned char rec_buf[REC_SIZE]; unsigned char prev[REC_SIZE]; char sumbuf[U16_ASCII_BUF_SIZE]; #if defined(SUMP_PUMP) struct summary local_summary; if (Multithreaded) { sum = &local_summary; memset(sum, 0, sizeof(struct summary)); } else #endif sum = &Summary; if ((rec = next_rec(in, rec_buf, sum)) == NULL) { fprintf(stderr, "there must be at least one record to be validated\n"); exit(1); } memcpy(sum->first_rec, rec, REC_SIZE); memcpy(prev, rec, REC_SIZE); sum->rec_count = add16(sum->rec_count, one); while ((rec = next_rec(in, rec_buf, sum)) != NULL) { /* make sure the record key is equal to or more than the * previous key */ diff = (*Compare)(prev, rec, 10); if (diff == 0) sum->dup_count = add16(sum->dup_count, one); else if (diff > 0) { if (sum->first_unordered.hi8 == 0 && sum->first_unordered.lo8 == 0) { sum->first_unordered = sum->rec_count; if (!Multithreaded && !Quiet) fprintf(stderr, "First unordered record is record %s\n", u16_to_dec(sum->first_unordered, sumbuf)); } sum->unordered_count = add16(sum->unordered_count, one); } sum->rec_count = add16(sum->rec_count, one); memcpy(prev, rec, REC_SIZE); } memcpy(sum->last_rec, prev, REC_SIZE); /* set last record for summary */ #if defined(SUMP_PUMP) if (Multithreaded) { pfunc_write(in, 0, sum, SUM_SIZE); return (SP_OK); } else #endif return (0); } /* next_sum - get the next partition summary */ int next_sum(void *in, struct summary *sum) { int ret; #if defined(SUMP_PUMP) if (Multithreaded) ret = (int)sp_read_output(in, 0, sum, SUM_SIZE); /* from sump pump */ else #endif ret = fread(sum, 1, SUM_SIZE, in); /* get from file */ if (ret == 0) return (0); else if (ret < 0) { fprintf(stderr, "summary read error\n"); exit(1); } else if (ret != SUM_SIZE) { fprintf(stderr, "partial partition summary found at end\n"); exit(1); } return (ret); } /* sum_summaries - validate a sequence of partition summaries */ void sum_summaries(void *in) { int diff; u16 one = {0LL, 1LL}; unsigned char prev[REC_SIZE]; char sumbuf[U16_ASCII_BUF_SIZE]; struct summary local_sum; if (next_sum(in, &local_sum) == 0) { fprintf(stderr, "there must be at least one record to be validated\n"); exit(1); } memcpy(&Summary, &local_sum, SUM_SIZE); memcpy(prev, Summary.last_rec, REC_SIZE); if (Summary.first_unordered.hi8 != 0 || Summary.first_unordered.lo8 != 0) { if (!Quiet) fprintf(stderr, "First unordered record is record %s\n", u16_to_dec(Summary.first_unordered, sumbuf)); } while (next_sum(in, &local_sum)) { /* make sure the record key is equal to or more than the * previous key */ diff = (*Compare)(prev, local_sum.first_rec, 10); if (diff == 0) Summary.dup_count = add16(Summary.dup_count, one); else if (diff > 0) { if (Summary.first_unordered.hi8 == 0 && Summary.first_unordered.lo8 == 0) { if (!Quiet) fprintf(stderr, "First unordered record is record %s\n", u16_to_dec(Summary.rec_count, sumbuf)); Summary.first_unordered = Summary.rec_count; } Summary.unordered_count = add16(Summary.unordered_count, one); } if ((Summary.first_unordered.hi8 == 0 && Summary.first_unordered.lo8 == 0) && !(local_sum.first_unordered.hi8 == 0 && local_sum.first_unordered.lo8 == 0)) { Summary.first_unordered = add16(Summary.rec_count, local_sum.first_unordered); if (!Quiet) fprintf(stderr, "First unordered record is record %s\n", u16_to_dec(Summary.first_unordered, sumbuf)); } Summary.unordered_count = add16(Summary.unordered_count, local_sum.unordered_count); Summary.rec_count = add16(Summary.rec_count, local_sum.rec_count); Summary.dup_count = add16(Summary.dup_count, local_sum.dup_count); Summary.checksum = add16(Summary.checksum, local_sum.checksum); memcpy(prev, local_sum.last_rec, REC_SIZE); } memcpy(Summary.last_rec, prev, REC_SIZE); /* get last rec of last summary*/ } /* get_input_fp - get input file pointer by opening input file for reading */ FILE *get_input_fp(char *filename) { FILE *in; if ((in = fopen(filename, "r")) == NULL) { perror(filename); exit(1); } return (in); } static char usage_str[] = "Valsort Sort Output Validator\n" "\n" "usage: valsort [-i] [-q] " #if defined(SUMP_PUMP) "[-tN] [-o SUMMARY_FILE] [-s] FILE_NAME[,opts]\n" #else "[-o SUMMARY_FILE] [-s] FILE_NAME\n" #endif "-i Use case insensitive ascii comparisons (optional for PennySort).\n" " Case sensitive ascii or binary keys are assumed by default.\n" "-q Quiet mode, don't output diagnostic text.\n" "-o SUMMARY_FILE Output a summary of the validated records. This method\n" " can be used to validate partitioned sort outputs separately.\n" " The contents of the separate summary files can then be\n" " concatenated into a single file that can be checked using\n" " the valsort program with the -s flag.\n" "-s The file to validate contains partition summaries instead of\n" " sorted records.\n" #if defined(SUMP_PUMP) "-tN Use N internal program threads to validate the records.\n" "FILE_NAME[,opts] The name of the sort output file or the partition\n" " summaries file to validate. File options may immediately\n" " follow the file name:\n" " ,buf Use buffered and synchronous file reads,\n" " instead of the default direct and asynchronous\n" " reads.\n" " ,dir Use direct and asynchronous file reads.\n" " The is the default.\n" " ,trans=N[k,m,g] Sets the file read request size in bytes,\n" " kilobytes, megabytes or gigabytes.\n" " ,count=N Sets the maximum number of simultaneous\n" " asynchronous read requests allowed.\n" #else "FILE_NAME The name of the sort output file or the partition summaries\n" " file to validate.\n" #endif "\n" "Example 1 - to validate the sorted order of a single sort output file:\n" " valsort sortoutputfile\n" "\n" "Example 2 - to validate the sorted order of output that has been\n" "partitioned into 4 output files: out0.dat, out1.dat, out2.dat and out3.dat:\n" " valsort -o out0.sum out0.dat\n" " valsort -o out1.sum out1.dat\n" " valsort -o out2.sum out2.dat\n" " valsort -o out3.sum out3.dat\n" " cat out0.sum out1.sum out2.sum out3.sum > all.sum\n" " valsort -s all.sum\n" "\n" "Copyright (C) 2009 - 2011, Chris Nyberg\n" "\n" "This program is free software; you can redistribute it and/or\n" "modify it under the terms of Version 2 of the GNU General Public\n" "License as published by the Free Software Foundation.\n" "\n" "This program is distributed in the hope that it will be useful,\n" "but WITHOUT ANY WARRANTY; without even the implied warranty of\n" "MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\n" "GNU General Public License for more details.\n" "\n" "You should have received a copy of the GNU General Public License\n" "along with this program; if not, write to the Free Software Foundation,\n" "Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.\n" ; void usage(void) { fprintf(stderr, usage_str); fprintf(stderr, "\nVersion %s, cvs $Revision: 1.14 $\n", Version); #if defined(SUMP_PUMP) fprintf(stderr, "SUMP Pump version %s\n", sp_get_version()); #endif exit(1); } int main(int argc, char *argv[]) { FILE *in; char sumbuf[U16_ASCII_BUF_SIZE]; FILE *out = NULL; #if defined(SUMP_PUMP) int number_threads = 0; int ret; sp_t sp_val; /* handle for sump pump */ #endif while (argc > 1 && argv[1][0] == '-') { if (argv[1][1] == 'i') Compare = (int (*)(const unsigned char *a, const unsigned char *b, size_t n))strcasecmp; else if (argv[1][1] == 'o') { if (argc < 4 || argv[2][0] == '-') usage(); if ((out = fopen(argv[2], "w")) == NULL) { perror(argv[2]); exit(1); } argc--; argv++; } else if (argv[1][1] == 'q') Quiet = 1; else if (argv[1][1] == 's') Read_summary = 1; #if defined(SUMP_PUMP) else if (argv[1][1] == 't') number_threads = atoi(argv[1] + 2); #endif else usage(); argc--; argv++; } if (argc != 2) usage(); /* if we are validating output partition summaries */ if (Read_summary) { in = get_input_fp(argv[1]); sum_summaries(in); } /* else we are validating a file with records in sorted order */ #if defined(SUMP_PUMP) else if (number_threads != 1) { Multithreaded = 1; /* start a sump pump to validate the correct order of records. * the sump pump infrastructure will break the file being * validated into separate blocks that will be summarized by * separate threads. this thread will then validate the * summaries. */ ret = sp_start(&sp_val, (sp_pump_t)summarize_records, "-IN_FILE=%s -IN_BUF_SIZE=%d -REC_SIZE=%d " "-OUT_BUF_SIZE[0]=%d -THREADS=%d", argv[1], 4 * 1024 * 1024, /* input buf size */ REC_SIZE, /* input record size */ sizeof(struct summary), /* output buf size */ number_threads); if (ret) { fprintf(stderr, "sp_start failed: %s\n", sp_get_error_string(sp_val, ret)); return (ret); } /* sum the summaries that are the output of the sump pump */ sum_summaries(sp_val); } #endif else { /* else validate the order of records with using only main thread */ in = get_input_fp(argv[1]); summarize_records(in, NULL); } /* if requested ("-o"), output a summary file */ if (out != NULL) fwrite(&Summary, SUM_SIZE, 1, out); if (!Quiet) { fprintf(stderr, "Records: %s\n", u16_to_dec(Summary.rec_count, sumbuf)); fprintf(stderr, "Checksum: %s\n", u16_to_hex(Summary.checksum, sumbuf)); if (Summary.unordered_count.hi8 | Summary.unordered_count.lo8) { fprintf(stderr, "ERROR - there are %s unordered records\n", u16_to_dec(Summary.unordered_count, sumbuf)); } else { fprintf(stderr, "Duplicate keys: %s\n", u16_to_dec(Summary.dup_count, sumbuf)); fprintf(stderr, "SUCCESS - all records are in order\n"); } } /* return non-zero if there are any unordered records */ return (Summary.unordered_count.hi8 | Summary.unordered_count.lo8) ? 1 : 0; }