# -*- coding: utf-8 -*-
# Copyright (c) 2016-2017, Zhijiang Yao, Jie Dong and Dongsheng Cao
# All rights reserved.
# This file is part of the PyBioMed.
# The contents are covered by the terms of the BSD license
# which is included in the file license.txt, found at the root
# of the PyBioMed source tree.
"""
##############################################################################
A class used for computing different types of DNA descriptors!
You can freely use and distribute it. If you have any problem,
you could contact with us timely.
Authors: Zhijiang Yao and Dongsheng Cao.
Date: 2016.06.14
Email: gadsby@163.com and oriental-cds@163.com
##############################################################################
"""
import sys
import math
import itertools
[docs]def Frequency(tol_str, tar_str):
"""
#################################################################
Generate the frequency of tar_str in tol_str.
:param tol_str: mother string.
:param tar_str: substring.
#################################################################
"""
i, j, tar_count = 0, 0, 0
len_tol_str = len(tol_str)
len_tar_str = len(tar_str)
while i < len_tol_str and j < len_tar_str:
if tol_str[i] == tar_str[j]:
i += 1
j += 1
if j >= len_tar_str:
tar_count += 1
i = i - j + 1
j = 0
else:
i = i - j + 1
j = 0
return tar_count
[docs]def MakeKmerList(k, alphabet):
try:
return ["".join(e) for e in itertools.product(alphabet, repeat=k)]
except TypeError:
print("TypeError: k must be an inter and larger than 0, alphabet must be a string.")
raise TypeError
except ValueError:
print("TypeError: k must be an inter and larger than 0")
raise ValueError
[docs]def MakeUptoKmerList(k_values, alphabet):
# Compute the k-mer for each value of k.
return_value = []
for k in k_values:
return_value.extend(MakeKmerList(k, alphabet))
return return_value
[docs]def NormalizeVector(normalize_method, k_values, vector, kmer_list):
# Do nothing if there's no normalization.
if normalize_method == "none":
return vector
# Initialize all vector lengths to zeroes.
vector_lengths = {k: 0 for k in k_values}
# Compute sum or sum-of-squares separately for each k.
num_kmers = len(kmer_list)
for i_kmer in range(0, num_kmers):
kmer_length = len(kmer_list[i_kmer])
count = vector[i_kmer]
if normalize_method == "frequency":
vector_lengths[kmer_length] += count
elif normalize_method == "unitsphere":
vector_lengths[kmer_length] += count * count
# Square root each sum if we're doing 2-norm.
if normalize_method == "unitsphere":
for k in k_values:
vector_lengths[k] = math.sqrt(vector_lengths[k])
# Divide through by each sum.
return_value = []
for i_kmer in range(0, num_kmers):
kmer_length = len(kmer_list[i_kmer])
count = vector[i_kmer]
vector_length = vector_lengths[kmer_length]
if vector_length == 0:
return_value.append(0)
else:
return_value.append(float(count) / float(vector_length))
return return_value
# Make a copy of a given string, substituting one letter.
[docs]def Substitute(position, letter, string):
return_value = ""
if position > 0:
return_value = return_value + string[0:position]
return_value = return_value + letter
if position < (len(string) - 1):
return_value = return_value + string[position + 1:]
return return_value
[docs]def ComputeBinNum(num_bins, position, k, numbers):
# If no binning, just return.
if num_bins == 1:
return 0
# Compute the mean value for this k-mer.
mean = 0
for i in range(0, k):
mean += float(numbers[position + i])
mean /= k
# Find what quantile it lies in.
for i_bin in range(0, num_bins):
if mean <= boundaries[k][i_bin]:
break
# Make sure we didn't go too far.
if i_bin == num_bins:
sys.stderr.write("bin=num_bins=%d\n", i_bin)
sys.exit(1)
return i_bin
[docs]def MakeSequenceVector(sequence,
numbers,
num_bins,
revcomp,
revcomp_dictionary,
normalize_method,
k_values,
mismatch,
alphabet,
kmer_list,
boundaries,
pseudocount):
# Make an empty counts vector.
kmer_counts = []
for i_bin in range(0, num_bins):
kmer_counts.append({})
# Iterate along the sequence.
for k in k_values:
seq_length = len(sequence) - k + 1
for i_seq in range(0, seq_length):
# Compute which bin number this goes in.
bin_num = ComputeBinNum(num_bins, i_seq, k, numbers)
# Extract this k-mer.
kmer = sequence[i_seq: i_seq + k]
# If we're doing reverse complement, store the count in the
# the version that starts with A or C.
if revcomp == 1:
rev_kmer = FindRevcomp(kmer, revcomp_dictionary)
if cmp(kmer, rev_kmer) > 0:
kmer = rev_kmer
# Increment the count.
if kmer in kmer_counts[bin_num]:
kmer_counts[bin_num][kmer] += 1
else:
kmer_counts[bin_num][kmer] = 1
# Should we also do the mismatch?
if mismatch != 0:
# Loop through all possible mismatches.
for i_kmer in range(0, k):
for letter in alphabet:
# Don't count yourself as a mismatch.
if kmer[i_kmer:i_kmer + 1] != letter:
# Find the neighboring sequence.
neighbor = Substitute(i_kmer, letter, kmer)
# If we're doing reverse complement, store the
# count in the version that starts with A or C.
if revcomp == 1:
rev_kmer = FindRevcomp(kmer,
revcomp_dictionary)
if cmp(kmer, rev_kmer) > 0:
kmer = rev_kmer
# Increment the neighboring sequence.
if neighbor in kmer_counts[bin_num]:
kmer_counts[bin_num][neighbor] += mismatch
else:
kmer_counts[bin_num][neighbor] = mismatch
# Build the sequence vector.
sequence_vector = []
for i_bin in range(0, num_bins):
for kmer in kmer_list:
if kmer in kmer_counts[i_bin]:
sequence_vector.append(kmer_counts[i_bin][kmer] + pseudocount)
else:
sequence_vector.append(pseudocount)
# Normalize it
return_value = NormalizeVector(normalize_method,
k_values,
sequence_vector,
kmer_list)
return return_value
[docs]def ReadFastaSequence(numeric,
fasta_file):
# Read 1 byte.
first_char = fasta_file.read(1)
# If it's empty, we're done.
if first_char == "":
return ["", ""]
# If it's ">", then this is the first sequence in the file.
elif first_char == ">":
line = ""
else:
line = first_char
# Read the rest of the header line.
line = line + fasta_file.readline()
# Get the rest of the ID.
words = line.split()
if len(words) == 0:
sys.stderr.write("No words in header line (%s)\n" % line)
sys.exit(1)
id = words[0]
# Read the sequence, through the next ">".
first_char = fasta_file.read(1)
sequence = ""
while (first_char != ">") and (first_char != ""):
if first_char != "\n": # Handle blank lines.
line = fasta_file.readline()
sequence = sequence + first_char + line
first_char = fasta_file.read(1)
# Remove EOLs.
clean_sequence = ""
for letter in sequence:
if letter != "\n":
clean_sequence += letter
sequence = clean_sequence
# Remove spaces.
if numeric == 0:
clean_sequence = ""
for letter in sequence:
if letter != " ":
clean_sequence = clean_sequence + letter
sequence = clean_sequence.upper()
return [id, sequence]
[docs]def ReadSequenceAndNumbers(fasta_file,
numbers_filename,
numbers_file):
[fasta_id, fasta_sequence] = ReadFastaSequence(0, fasta_file)
if numbers_filename != "":
[number_id, number_sequence] = ReadFastaSequence(1, number_file)
# Make sure we got the same ID.
if fasta_id != number_id:
sys.stderr.write("Found mismatching IDs (%s != %d)\n" %
(fasta_id, number_id))
sys.exit(1)
# Split the numbers into a list.
number_list = number_sequence.split()
# Verify that they are the same length.
if len(fasta_sequence) != len(number_list):
sys.stderr.write("Found sequence of length %d with %d numbers.\n"
% (len(sequence), len(number_list)))
print(sequence)
print(numbers)
sys.exit(1)
else:
number_list = ""
return fasta_id, fasta_sequence, number_list
[docs]def FindRevcomp(sequence,
revcomp_dictionary):
# Save time by storing reverse complements in a hash.
if sequence in revcomp_dictionary:
return revcomp_dictionary[sequence]
# Make a reversed version of the string.
rev_sequence = list(sequence)
rev_sequence.reverse()
rev_sequence = ''.join(rev_sequence)
return_value = ""
for letter in rev_sequence:
if letter == "A":
return_value += "T"
elif letter == "C":
return_value += "G"
elif letter == "G":
return_value += "C"
elif letter == "T":
return_value += "A"
elif letter == "N":
return_value += "N"
else:
sys.stderr.write("Unknown DNA character (%s)\n" % letter)
sys.exit(1)
# Store this value for future use.
revcomp_dictionary[sequence] = return_value
return return_value
[docs]def ComputeQuantileBoundaries(num_bins,
k_values,
number_filename):
if num_bins == 1:
return
# The boundaries are stored in a 2D dictionary.
boundaries = {}
# Enumerate all values of k.
for k in k_values:
# Open the number file for reading.
number_file = open(number_filename, "r")
# Read it sequence by sequence.
all_numbers = []
[id, numbers] = ReadFastaSequence(1, number_file)
while id != "":
# Compute and store the mean of all k-mers.
number_list = numbers.split()
num_numbers = len(number_list) - k
for i_number in range(0, num_numbers):
if i_number == 0:
sum = 0;
for i in range(0, k):
sum += float(number_list[i])
else:
sum -= float(number_list[i_number - 1])
sum += float(number_list[i_number + k - 1])
all_numbers.append(sum / k)
[id, numbers] = ReadFastaSequence(1, number_file)
number_file.close()
# Sort them.
all_numbers.sort()
# Record the quantile.
boundaries[k] = {}
num_values = len(all_numbers)
bin_size = float(num_values) / float(num_bins)
sys.stderr.write("boundaries k=%d:" % k)
for i_bin in range(0, num_bins):
value_index = int((bin_size * (i_bin + 1)) - 1)
if value_index == num_bins - 1:
value_index = num_values - 1
value = all_numbers[value_index]
boundaries[k][i_bin] = value
sys.stderr.write(" %g" % boundaries[k][i_bin])
sys.stderr.write("\n")
return boundaries
#########################################################################
# The start of Changing by aleeee.
#########################################################################
[docs]def cmp(a, b):
return (a > b) - (a < b)
[docs]def MakeRevcompKmerList(kmer_list):
revcomp_dictionary = {}
new_kmer_list = [kmer for kmer in kmer_list if cmp(kmer, FindRevcomp(kmer, revcomp_dictionary)) <= 0]
return new_kmer_list
[docs]def MakeIndexUptoKRevcomp(k):
"""
###########################################################################
Generate the index for revcomp and from 1 to k.
###########################################################################
"""
sum = 0
index = [0]
for i in range(1, k + 1):
if i % 2 == 0:
sum += (math.pow(2, 2 * i - 1) + math.pow(2, i - 1))
index.append(int(sum))
else:
sum += math.pow(2, 2 * i - 1)
index.append(int(sum))
return index
[docs]def MakeIndexUptoK(k):
"""
###########################################################################
Generate the index from 1 to k.
###########################################################################
"""
sum = 0
index = [0]
for i in range(1, k + 1):
sum += math.pow(4, i)
index.append(int(sum))
return index
[docs]def MakeIndex(k):
"""
###########################################################################
Generate the index just for k.
###########################################################################
"""
index = [0, int(math.pow(4, k))]
return index
[docs]def MakeKmerVector(seq_list, kmer_list, rev_kmer_list, k, upto, revcomp, normalize):
"""
###########################################################################
Generate kmer vector.
###########################################################################
"""
# Generate the alphabet index.
if upto:
index = MakeIndexUptoK(k)
sum = [0] * k
len_k = k
else:
index = MakeIndex(k)
sum = [0]
len_k = 1
vector = []
for seq in seq_list:
kmer_count = {}
# Generate the kmer frequency vector.
for i in range(len_k):
sum[i] = 0
for j in range(index[i], index[i + 1]):
kmer = kmer_list[j]
temp_count = Frequency(seq, kmer)
# print temp_count
if revcomp:
rev_kmer = FindRevcomp(kmer, {})
if kmer <= rev_kmer:
if kmer not in kmer_count:
kmer_count[kmer] = 0
kmer_count[kmer] += temp_count
else:
if rev_kmer not in kmer_count:
kmer_count[rev_kmer] = 0
kmer_count[rev_kmer] += temp_count
else:
if kmer not in kmer_count:
kmer_count[kmer] = 0
kmer_count[kmer] += temp_count
sum[i] += temp_count
print kmer_count
# Store the kmer frequency vector.
if revcomp:
temp_vec = [kmer_count[kmer] for kmer in rev_kmer_list]
else:
temp_vec = [kmer_count[kmer] for kmer in kmer_list]
# Normalize.
if normalize:
i = 0
if not upto:
temp_vec = [round(float(e)/sum[i], 3) for e in temp_vec]
if upto:
if revcomp:
upto_index = MakeIndexUptoKRevcomp(k)
else:
upto_index = MakeIndexUptoK(k)
j = 0
for e in temp_vec:
if j >= upto_index[i + 1]:
i += 1
temp_vec[j] = round(float(e) / sum[i], 3)
j += 1
vector.append(temp_vec)
# if 0 != len(rev_kmer_list):
# print "The kmer is", rev_kmer_list
# else:
# print "The kmer is", kmer_list
return vector
[docs]def Diversity(vec):
"""
###########################################################################
Calculate diversity.
:param vec: kmer vec
:return: Diversity(X)
###########################################################################
"""
m_sum = sum(vec)
from math import log
return m_sum*log(m_sum, 2) - sum([e*log(e, 2) for e in vec if e != 0])
[docs]def IdXS(vec_x, vec_s, diversity_s):
"""
###########################################################################
Calculate ID(X, S)
:param vec_x: kmer X
:param vec_s: kmer S
:return: ID(X, S) = Diversity(X + S) - Diversity(X) - Diversity(S)
###########################################################################
"""
# print 'vec_x', vec_x
# print 'vec_s', vec_s
vec_x_s = [sum(e) for e in zip(vec_x, vec_s)]
# print 'vec_x_s', vec_x_s
# print diversity(vec_x_s), diversity(vec_x), diversity_s
return Diversity(vec_x_s) - Diversity(vec_x) - diversity_s
##############################################################################
# End of aleeee's change.
##############################################################################
##############################################################################
# MAIN
##############################################################################
if __name__ == '__main__':
# Define the command line usage.
usage = """Usage: fasta2matrix [options] <k> <fasta file>
Options:
-upto Use all values from 1 up to the specified k.
-revcomp Collapse reverse complement counts.
-normalize [frequency|unitsphere] Normalize counts to be
frequencies or project onto unit sphere. With -upto,
normalization is done separately for each k.
-protein Use an amino acid alphabet. Default=ACGT.
-alphabet <string> Set the alphabet arbitrarily.
-mismatch <value> Assign count of <value> to k-mers that
are 1 mismatch away.
-binned <numbins> <file> Create <numbins> vectors for each
sequence, and place each k-mer count
into the bin based upon its corresponding
mean value from the <file>. The
<file> is in FASTA-like format, with
space-delimited numbers in place of
the sequences. The sequences must
have the same names and be in the same
order as the given FASTA file.
-pseudocount <value> Assign the given pseudocount to each bin.
"""
# Define default options.
upto = 0
revcomp = 0
normalize_method = "none"
alphabet = "ACGT"
mismatch = 0
num_bins = 1
pseudocount = 0
number_filename = ""
# Parse the command line.
sys.argv = sys.argv[1:]
while len(sys.argv) > 2:
next_arg = sys.argv[0]
sys.argv = sys.argv[1:]
if next_arg == "-revcomp":
revcomp = 1
elif next_arg == "-upto":
upto = 1
elif next_arg == "-normalize":
normalize_method = sys.argv[0]
sys.argv = sys.argv[1:]
if (normalize_method != "unitsphere") and (normalize_method != "frequency"):
sys.stderr.write("Invalid normalization method (%s).\n"
% normalize_method);
sys.exit(1)
elif next_arg == "-protein":
alphabet = "ACDEFGHIKLMNPQRSTVWY"
elif next_arg == "-alphabet":
alphabet = sys.argv[0]
sys.argv = sys.argv[1:]
elif next_arg == "-mismatch":
mismatch = float(sys.argv[0])
sys.argv = sys.argv[1:]
elif next_arg == "-binned":
num_bins = int(sys.argv[0])
sys.argv = sys.argv[1:]
number_filename = sys.argv[0]
sys.argv = sys.argv[1:]
elif next_arg == "-pseudocount":
pseudocount = int(sys.argv[0])
sys.argv = sys.argv[1:]
else:
sys.stderr.write("Invalid option (%s)\n" % next_arg)
sys.exit(1)
if len(sys.argv) != 2:
sys.stderr.write(usage)
sys.exit(1)
k = int(sys.argv[0])
fasta_filename = sys.argv[1]
# Check for reverse complementing non-DNA alphabets.
if (revcomp == 1) and (alphabet != "ACGT"):
sys.stderr.write("Attempted to reverse complement ")
sys.stderr.write("a non-DNA alphabet (%s)\n" % alphabet)
# Make a list of all values of k.
k_values = []
if upto == 1:
start_i_k = 1
else:
start_i_k = k
k_values = list(range(start_i_k, k + 1))
# If numeric binning is turned on, compute quantile boundaries for various
# values of k.
boundaries = ComputeQuantileBoundaries(num_bins, k_values, number_filename)
# Make a list of all k-mers.
kmer_list = MakeUptoKmerList(k_values, alphabet)
sys.stderr.write("Considering %d kmers.\n" % len(kmer_list))
# Set up a dictionary to cache reverse complements.
revcomp_dictionary = {}
# Use lexicographically first version of {kmer, revcomp(kmer)}.
if revcomp == 1:
kmer_list = MakeRevcompKmerList(kmer_list)
# Print the corner of the matrix.
sys.stdout.write("fasta2matrix")
# Print the title row.
for i_bin in range(1, num_bins + 1):
for kmer in kmer_list:
if num_bins > 1:
sys.stdout.write("\t%s-%d" % (kmer, i_bin))
else:
sys.stdout.write("\t%s" % kmer)
sys.stdout.write("\n")
# Open the sequence file.
if fasta_filename == "-":
fasta_file = sys.stdin
else:
fasta_file = open(fasta_filename, "r")
if number_filename == "":
number_file = 0
else:
number_file = open(number_filename, "r")
# Read the first sequence.
[id, sequence, numbers] = ReadSequenceAndNumbers(fasta_file,
number_filename,
number_file)
# Iterate till we've read the whole file.
i_sequence = 1
while id != "":
# Tell the user what's happening.
if i_sequence % 100 == 0:
sys.stderr.write("Read %d sequences.\n" % i_sequence)
# Compute the sequence vector.
vector = MakeSequenceVector(sequence,
numbers,
num_bins,
revcomp,
revcomp_dictionary,
normalize_method,
k_values,
mismatch,
alphabet,
kmer_list,
boundaries,
pseudocount)
# Print the formatted vector.
sys.stdout.write(id)
for element in vector:
sys.stdout.write("\t%g" % element)
sys.stdout.write("\n")
# Read the next sequence.
[id, sequence, numbers] = ReadSequenceAndNumbers(fasta_file,
number_filename,
number_file)
i_sequence += 1
# Close the file.
fasta_file.close()