#!/usr/bin/env python2.7 """ Thu Jul 18 01:36:21 PDT 2013 Kevin Karplus The density-function.py file is mainly intended for use as a module (rather that as a standalone program). The main entry points are cumulative converts a list of sortable items into an iterator over sorted pairs of (item, cumulative count) binned_cumulative converts a list of numbers into a sorted list of (threshold, cumulative count) where the thresholds are bin boundaries (not equal to any numbers on the list) The first pair has a cumulative count of 0 and a threshold less than any number on the input list. The last pair has a cumulative count equal to the length of the input list and a threshold larger than any element of the list. Users specify the number of bins, and get (approximately) one more pair on the list than the specified numer. Different binning methods can be specified: width fixed-width bins count fixed-count bins tapered fixed-count bins with smaller counts near the beginning and end area fixed width*count bins density_function takes the output of binned_cumulative and produces a plottable series of pairs (threshold, probability density) Output format may be steps two points per bin to make step-wise function lines linear interpolation between bin centers As a stand-alone program, the file converts a list of numbers into a table of pairs that can be plotted as a probability density function. The gnuplot command plot '0) cum_pairs = [ x for x in cumulative(numbers)] if len(cum_pairs)==0: return [(0,0), (1,0)] first_x = cum_pairs[0][0] if len(cum_pairs)==1: return [(first_x-0.5,0), (first_x+0.5,cum_pairs[0][1])] # project out bin boundaries past real data, using first 2 and last values cum_pairs.insert(0, tuple( (1.5*first_x-0.5*cum_pairs[1][0], 0)) ) cum_pairs.append( tuple( (1.5*cum_pairs[-1][0]-0.5*cum_pairs[-2][0], count)) ) if num_bins==1: return [ cum_pairs[0], cum_pairs[-1] ] total_width = cum_pairs[-1][0] - cum_pairs[0][0] if method=="width": # use fixed-width bins (total interval/num_bins) counts = [0]*num_bins bin_width = total_width/num_bins bin_scale = num_bins/total_width start=cum_pairs[0][0] oldc=0 for x,c in cum_pairs: subscript=int( (x-start)*bin_scale ) if (subscriptlen(cum_pairs)-2: num_bins=len(cum_pairs)-2 if method=="count": # Use bins that are approximately equal counts. # This method does a low-to-high sweep setting boundaries, # which may result in target counts getting lower towards the end, # as earlier boundaries overshoot their target counts. remaining_count = count remaining_bins = num_bins cum_to_find = remaining_count/remaining_bins thinned = [cum_pairs[0]] # zero count at beginning for x,y in izip(cum_pairs,islice(cum_pairs,1,None)): if x[1]>= cum_to_find: thinned.append( ( (x[0]+y[0])/2, x[1] ) ) remaining_count = count-x[1] remaining_bins -=1 if remaining_bins==0: break cum_to_find = x[1] + remaining_count/remaining_bins # print("DEBUG: cum_pairs=",cum_pairs, file=sys.stderr) if thinned[-1][1] == cum_pairs[-1][1]: # the last bin covered all counts, # but may not include the extension thinned = thinned[:-1] thinned.append(cum_pairs[-1]) return thinned if method=="tapered": # This method is like "count" but tapers the bin sizes towards the ends approx_bin_count = count/(num_bins-1) # size for middle bins # num_end_bins is how many bins on each end to ramp up size over. # The total count for the first num_end_bins is about half the middle bins. # (Same for the last num_end_bins) num_end_bins = min(num_bins//10, (len(cum_pairs)-num_bins)//2) if num_end_bins==0: bin_size = count/num_bins cum_to_find = [round(i*bin_size) for i in xrange(1,num_bins+1)] else: effective_num_bins = num_bins-2*num_end_bins +1 bin_size = count/effective_num_bins # average count for middle bins first_size = bin_size/num_end_bins cum_to_find = [] cum=0 size = first_size # ramp up from the beginning cum_to_find = [ int(round(bin_size/(num_end_bins+1) *i*(i+1)/2)) for i in xrange(1,num_end_bins+1)] # fill in the middle cum_so_far = cum_to_find[-1] from_end = [count] + [count-x for x in cum_to_find[0:num_end_bins]] middle_bin_size = (count-2*cum_so_far)/ (num_bins-2*num_end_bins) # print("DEBUG: cum_so_far=", cum_so_far, " middle_bin_size=",middle_bin_size, file=sys.stderr) cum_to_find.extend( [ int(round(middle_bin_size*(i-num_end_bins+1)+cum_so_far)) for i in xrange(num_end_bins, num_bins-num_end_bins-1)]) # make the second half by reversing the first half and counting from the end # print("DEBUG: from_end=", from_end, " len(cum_to_find)=",len(cum_to_find), file=sys.stderr) cum_to_find.extend(from_end[::-1]) # print("DEBUG: num_bins=", num_bins, " len(cum_to_find)=",len(cum_to_find), " cum_to_find=",cum_to_find, file=sys.stderr) thinned = [cum_pairs[0]] # zero count at beginning bin=0 for x,y in izip(cum_pairs,islice(cum_pairs,1,None)): if x[1]>= cum_to_find[bin]: # print("DEBUG: x=",x," y=",y, file=sys.stderr) thinned.append( ( (x[0]+y[0])/2, x[1] ) ) bin+=1 # print("DEBUG: cum_pairs=",cum_pairs, file=sys.stderr) if thinned[-1][1] == cum_pairs[-1][1]: # the last bin covered all counts, but may not include the extension thinned = thinned[:-1] thinned.append(cum_pairs[-1]) return thinned if method=="area": # This method scales the bins so that # the product of the count and the binwidth are roughly constant. remaining_area = (cum_pairs[-1][0] - cum_pairs[0][0])*count remaining_bins = num_bins thinned = [cum_pairs[0]] # zero count at beginning for x,y in izip(cum_pairs,islice(cum_pairs,1,None)): boundary = (x[0]+y[0])/2 bin_count = x[1]-thinned[-1][1] width = boundary - thinned[-1][0] if bin_count*width >= remaining_area/(remaining_bins*remaining_bins): thinned.append( ( boundary, x[1] ) ) remaining_area = (cum_pairs[-1][0] - boundary)*(count-x[1]) remaining_bins -= 1 if remaining_bins == 0: break if thinned[-1][1] == cum_pairs[-1][1]: # the last bin covered all counts, # but may not include the extension thinned = thinned[:-1] thinned.append(cum_pairs[-1]) # print("DEBUG: num_bins=",num_bins," len(thinned)=",len(thinned), file=sys.stderr) return thinned def density_function(cum_pairs,smoothing="steps"): """ This is a generator that yields points. Converts a cumulative pair list [ (x0,0) ... (xn,total_count)] into a probability density function for plotting. Note: x0< x1< ... =2) count = cum_pairs[-1][1] if count<=0: return if smoothing=="steps" or len(cum_pairs)==2: yield (cum_pairs[0][0], 0) else: yield ( 1.5*cum_pairs[0][0] - 0.5*cum_pairs[1][0], 0) for old_pair,pair in izip(cum_pairs,islice(cum_pairs,1,None)): old_threshold = old_pair[0] threshold = pair[0] level = (pair[1]-old_pair[1])/(count*(threshold-old_threshold)) if smoothing=="steps": yield (old_threshold, level) yield (threshold, level) else: yield ( (threshold+old_threshold)/2, level) if smoothing=="steps" or len(cum_pairs)==2: yield (cum_pairs[-1][0],0) else: yield ( 1.5*cum_pairs[0][-1] - 0.5*cum_pairs[1][-2], 0) # --------------------------------------------------------- # Below this line are functions primarily for testing or using the # module as a stand-alone program. def positive_int(string): """Type converter for argparse allowing only int > 0 """ value = int(string) if value<=0: msg = "{} is not a positive integer".format(string) raise argparse.ArgumentTypeError(msg) return value def parse_args(argv): """parse the command-line options. Returning all as members of a class """ parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.set_defaults( column=1 , num_bins=None , method="area" , smoothing="steps" ) parser.add_argument("--column","-c", action="store", type=positive_int, help="""Which (white-space separated) column of each line to read. One-based indexing. """) parser.add_argument("--num_bins","-n", action="store", type=positive_int, help="""Number of bins to use for "tapered" variant. Approximate number of bins for "area" variant. Default is sqrt(count)+1. """) parser.add_argument("--method","-m", choices=["width","count","tapered","area"], help="""Different algorithms for choosing bin widths: width fixed-width bins (the classic method for histograms) count roughly fixed-count bins, giving finer resolution where the probability density is higher. tapered has roughly equal counts in the middle, but reduces the counts towards the two ends, to get better resolution in the tails, where counts are low area has roughly equal count*bin_width throughout, providing good resolution in both high-density and low-density Default is area. """) parser.add_argument("--smoothing","-s", choices=["steps","lines"], help="""Different ways of output the density function: steps step for each bin (two points per bin) lines straight lines between bin centers """) return parser.parse_args(argv[1:]) def column(file_obj, col_num=0): """yields one number from each line of a file, ignoring blank lines or comment lines whose first non-white-space is # Columns are numbered with zero-based indexing. """ for line in file_obj: line = line.strip() if line=="" or line.startswith("#"): continue fields = line.split() yield float(fields[col_num]) def main(args): """Example of using the density_function and binned_cumulative functions. This function (and the parse_args function) are not used when density_function.py is used as module. """ options=parse_args(sys.argv) numbers = [x for x in column(sys.stdin,options.column-1)] cum_bins = binned_cumulative(numbers, num_bins=options.num_bins,method=options.method) # print ("DEBUG: cum_bins=",cum_bins,file=sys.stderr) for x,cum in density_function(cum_bins,smoothing=options.smoothing): print (x, "\t", cum) if __name__ == "__main__" : try : sys.exit(main(sys.argv)) except EnvironmentError as (errno,strerr): sys.stderr.write("ERROR: " + strerr + "\n") sys.exit(errno)