import sys import os import os.path as op import gzip import re import shlex class UniParcXMLParser: """ """ def __init__(self, file_path, output_dir, writer='csv'): self.file_path = file_path self.file_handle = self._get_file_handle(file_path) # if not op.isdir(output_dir): raise Exception("`output_dir` must exist!") elif os.listdir(output_dir): raise Exception("`output_dir` must be empty!") self.output_dir = output_dir # if writer not in ['pandas', 'csv']: raise Exception("Only 'csv' and 'pandas' writers are supported!") self.writer = writer # self._uniparc = {'uniparc_id': None} self._uniparc_sequence = {'uniparc_id': None} self._uniparc_xref = {'uniparc_xref_id': None} self._uniparc_xref_prop = {'uniparc_xref_prop_id': None} # self._uniparc_xref_id = 0 self._uniparc_xref_prop_id = 0 # self._uniparc_cache = [] self._uniparc_sequence_cache = [] self._uniparc_xref_cache = [] self._uniparc_xref_prop_cache = [] # self._uniparc_columns = [ 'uniparc_id', 'dataset', 'UniProtKB_exclusion' ] self._uniparc_sequence_columns = [ 'uniparc_id', 'length', 'checksum', 'sequence' ] self._uniparc_xref_columns = [ 'uniparc_xref_id', 'uniparc_id', 'type', 'id', 'version_i', 'active', 'version', 'created', 'last' ] self._uniparc_xref_prop_columns = [ 'uniparc_xref_prop_id', 'uniparc_xref_id', 'type', 'value' ] # uniparc_xref_prop self._uniparc_xref_props = [ 'ncbi_gi', 'ncbi_taxonomy_id', 'protein_name', 'gene_name', 'chain', 'uniprot_kb_accession', 'proteome_id', 'component', ] self._uniparc_xref2prop = {k: [] for k in self._uniparc_xref_props} self._uniparc_xref_prop = {k: {} for k in self._uniparc_xref_props} self._uniparc_xref_prop_idx = {k: 1 for k in self._uniparc_xref_props} def _get_file_handle(self, file_path): """Return an iterator over compressed or uncompressed files.""" extension = op.splitext(file_path)[-1] if extension == '.gz': return gzip.open(self.file_path, mode='rt') elif extension == '.bz2': raise NotImplementedError else: return open(file_path) @property def _file_iterator(self): for line in self.file_handle: line = line.strip(' \n') yield line self.file_handle.close() def parse(self): """Parse UniParc XML file in a hacky non-validating manner.""" for line in self._file_iterator: match = [] for parser, fn in self._parsers: match = parser.findall(line) if match: assert len(match) == 1 try: fn(match[0]) except Exception as e: print(type(e)) print(str(e)) print(line) print(match) break if not match: print("Did not match the following line: '{}'\n".format(line)) # Flush the last chunk self._flush_cache() @property def _parsers(self): if self.__parsers: return self.__parsers else: self.__parsers = [ # === UniParc === (re.compile(''), self._parse_uniparc_start), (re.compile('(\w+)'), self._parse_uniparc_accession), (re.compile(''), self._parse_uniparc_sequence), (re.compile(''), self._parse_uniparc_end), # === UniParc XRef === (re.compile(''), self._parse_uniparc_xref), (re.compile(''), self._parse_uniparc_xref_end), # === UniParc XRef Prop === (re.compile(''), self._parse_uniparc_xref_prop), # === Junk === (re.compile(''), lambda x: None), ] return self.__parsers def _parse_match(self, match): def split_kv(kv): k, _, v = kv.partition('=') # v = v.strip('"') # shlex does this already return k, v # UniParc erroneously(?) escapes double quote sometimes if '\\"' in match: match = match.replace('\\"', '"') data = dict(split_kv(kv) for kv in shlex.split(match)) return data # === UniParc === def _parse_uniparc_start(self, match): assert self._uniparc['uniparc_id'] is None # data = self._parse_match(match) assert all(c in self._uniparc_columns for c in data) self._uniparc.update(data) def _parse_uniparc_accession(self, match): assert match.startswith('UPI') self._uniparc['uniparc_id'] = match def _parse_uniparc_sequence(self, match): assert (self._uniparc['uniparc_id'] is not None and self._uniparc_sequence['uniparc_id'] is None) self._uniparc_sequence['uniparc_id'] = self._uniparc['uniparc_id'] # data = self._parse_match(match) assert all(c in self._uniparc_sequence_columns for c in data) self._uniparc_sequence.update(data) # line = '' sequence = '' for line in self._file_iterator: if line != '': sequence += line.strip() else: break assert sequence.isupper() self._uniparc_sequence['sequence'] = sequence self._flush_uniparc_sequence() def _parse_uniparc_end(self, match): assert self._uniparc['uniparc_id'] is not None self._flush_uniparc() # === UniParc XRef === def _parse_uniparc_xref(self, match): do_flush = False if match.endswith('/'): do_flush = True match = match.strip('/') # assert ('uniparc_id' not in self._uniparc_xref and self._uniparc['uniparc_id'] is not None) self._uniparc_xref['uniparc_id'] = self._uniparc['uniparc_id'] # assert self._uniparc_xref['uniparc_xref_id'] is None self._uniparc_xref_id += 1 self._uniparc_xref['uniparc_xref_id'] = self._uniparc_xref_id # data = self._parse_match(match) assert all(c in self._uniparc_xref_columns for c in data) self._uniparc_xref.update(data) if do_flush: self._flush_uniparc_xref() def _parse_uniparc_xref_end(self, match): assert 'uniparc_xref_id' in self._uniparc_xref self._flush_uniparc_xref() # === UniParc XRef Prop === def _parse_uniparc_xref_prop(self, match): assert ('uniparc_xref_id' not in self._uniparc_xref_prop and self._uniparc_xref['uniparc_xref_id'] is not None) self._uniparc_xref_prop['uniparc_xref_id'] = self._uniparc_xref['uniparc_xref_id'] # assert self._uniparc_xref_prop['uniparc_xref_prop_id'] is None self._uniparc_xref_prop_id += 1 self._uniparc_xref_prop['uniparc_xref_prop_id'] = self._uniparc_xref_prop_id # data = self._parse_match(match) assert all(c in self._uniparc_xref_prop_columns for c in data) self._uniparc_xref_prop.update(data) self._flush_uniparc_xref_prop() def _parse_uniparc_xref_prop_new(self, match): """Work in progress...""" assert ('uniparc_xref_id' not in self._uniparc_xref_prop and self._uniparc_xref['uniparc_xref_id'] is not None) data = self.parse_match(match) try: data['value_id'] = self._uniparc_xref_prop[data['type']][data['value']] except KeyError: self._uniparc_xref_prop[data['type']][data['value']] = ( self._uniparc_xref_prop_idx[data['type']] ) self._uniparc_xref_prop_idx[data['type']] += 1 data['value_id'] = self._uniparc_xref_prop[data['type']][data['value']] self._uniparc_xref2prop[data['type']].append( (self._uniparc_xref['uniparc_xref_id'], data['value_id'])) # === Output === def _flush_uniparc(self): if len(self._uniparc_cache) > 100: self._flush_cache() self._uniparc_cache.append(self._uniparc) self._uniparc = {'uniparc_id': None} def _flush_uniparc_sequence(self): self._uniparc_sequence_cache.append(self._uniparc_sequence) self._uniparc_sequence = {'uniparc_id': None} def _flush_uniparc_xref(self): self._uniparc_xref_cache.append(self._uniparc_xref) self._uniparc_xref = {'uniparc_xref_id': None} def _flush_uniparc_xref_prop(self): self._uniparc_xref_prop_cache.append(self._uniparc_xref_prop) self._uniparc_xref_prop = {'uniparc_xref_prop_id': None} def _flush_cache(self): """Flush cached data to files.""" caches = [ ('uniparc.tsv', self._uniparc_cache, self._uniparc_columns), ('uniparc_sequence.tsv', self._uniparc_sequence_cache, self._uniparc_sequence_columns), ('uniparc_xref.tsv', self._uniparc_xref_cache, self._uniparc_xref_columns), ] + [ ('uniparc_xref2{}.tsv'.format(xref), self._uniparc_xref2prop[xref], self._uniparc_xref_prop_columns) for xref in self._uniparc_xref_props ] for filename, cache, columns in caches: self._append_to_file(filename, cache, columns) del cache[:] def _append_to_file(self, filename, data, columns): if self.writer == 'pandas': self._append_to_file_pandas(filename, data, columns) elif self.writer == 'csv': self._append_to_file_csv(filename, data, columns) else: raise Exception def _append_to_file_pandas(self, filename, data, columns): import pandas as pd file_path = op.join(self.output_dir, filename) df = pd.DataFrame(data, columns=columns) write_header = False if not op.isfile(file_path): write_header = True with open(file_path, 'a+') as ofh: df.to_csv(ofh, sep='\t', na_rep='\\N', index=False, header=write_header) def _append_to_file_csv(self, filename, data, columns): import csv file_path = op.join(self.output_dir, filename) csv_writer_kwargs = {} if sys.version_info >= (3, 0): csv_writer_kwargs['newline'] = '' if not op.isfile(file_path): # Add header data.insert(0, {c: c for c in columns}) with open(file_path, 'a+', **csv_writer_kwargs) as ofh: writer = csv.writer( ofh, delimiter='\t', quoting=csv.QUOTE_MINIMAL, lineterminator='\n') for row_dict in data: assert not set(row_dict) - set(columns), ( set(row_dict) - set(columns), filename) row = [row_dict.get(c, '\\N') for c in columns] writer.writerow(row) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--file_path', type=str) parser.add_argument('--output_dir', type=str) args = parser.parse_args() hacky_xml_parser = UniParcXMLParser(args.file_path, args.output_dir) hacky_xml_parser.parse()