Source code for pulsarpy_dx.utils

#!/usr/bin/env python3

###
#Nathaniel Watson
#Stanford School of Medicine
#Nov. 6, 2018
#nathankw@stanford.edu
###

import pdb

from pulsarpy_dx import logger
from pulsarpy import models
from pulsarpy.elasticsearch_utils import MultipleHitsException
import scgpm_seqresults_dnanexus.dnanexus_utils as du 

[docs]class BarcodeNotSet(Exception): """ Raised when a barcode (paired or single-end) is expected to be set on a Library record but isn't. """
[docs]class MissingSequencingRequest(Exception): """ Raised when a SequencingRequest was expected to be found in Pulsar but was not. """
[docs]def get_or_create_srun_by_ids(sreq_id, dx_project_id): """ A wrapper over get_or_create_srun() below that simplifies the parameters to use IDs instead of objects. Args: sreq_id: `int`. A Pulsar SequencingRequest record ID. dx_project_id: `str`. The project ID of a DNAnexus project, i.e. FPg8yJQ900P4ZgzxFZbgJZY2. Returns: `pulsarpy.models.SequencingRun` instance. """ sreq = models.SequencingRequest(sreq_id) dxres = du.DxSeqResults(dx_project_id=dx_project_id) return get_or_create_srun(sreq, dxres)
[docs]def get_or_create_srun(sreq, dxres): """ Checks whether a given SequencingRequest record already has a SequencingRun record for a particular DNAnexus project. This check is satisfied if either of the following are true: 1. There is a SequencingRun whose name attribute is equal to the value of the DNAnexus project's name (case-insensitive). 2. There is a SequencingRun with an associated DataStorage whose project_identifier attribute is equal to the project ID if the DNAnexus project. If such a SequencingRun record exists, it is returned, otherwise a new SequencingRun record based off of the provided DNAnexus sequencing results is created and then returned. Args: sreq: `pulsarpy.models.SequencingRequest` instance. dxres - `scgpm_seqresults_dnanexus.dnanexus_utils.du.DxSeqResults()` instance that contains sequencing results metadata from DNAnexus that represents a sequencing run of the given `pulsarpy.models.SequencingRequest`. Returns: `pulsarpy.models.SequencingRun` instance. """ dx_proj_name = dxres.dx_project_name.strip().lower() srun_ids = sreq.sequencing_run_ids if srun_ids: for i in srun_ids: srun = models.SequencingRun(i) # Check by name, case-insensitive. if srun.name.strip().lower() == dx_proj_name: return srun # Also check by DataStorage elif srun.data_storage_id: ds = models.DataStorage(srun.data_storage_id) if ds.project_identifier == dxres.dx_project_id: return srun # Create SequencingRun srun_json = create_srun(sreq, dxres) srun = models.SequencingRun(srun_json["id"]) return srun
[docs]def create_srun(sreq, dxres): """ Creates a SequencingRun record based on the provided DNAnexus sequencing results, to be linked to the given SequencingRequest object. Note that I would also like to try and set the attributes `SequencingRun.forward_read_len` and `SequencingRun.reverse_read_len`, however, I can't obtain these results from DNAnexus based on the existing metadata that's sent there via GSSC. Args: sreq: A `pulsarpy.models.SequencingRequest` instance. dxres: `scgpm_seqresults_dnanexus.dnanexus_utils.du.DxSeqResults()` instance that contains sequencing results metadata in DNAnexus for the given srun. """ logger.debug("Creating SequencingRun and associated DataStorage") data_storage_json = create_data_storage(dxres) payload = {} payload["name"] = dxres.dx_project_name.strip() payload["sequencing_request_id"] = sreq.id payload["status"] = "finished" payload["data_storage_id"] = data_storage_json["id"] payload["lane"] = dxres.dx_project_props["seq_lane_index"] logger.debug("Creating SequencingRun record.") return models.SequencingRun.post(payload)
[docs]def create_data_storage(dxres): """ Creates a DataStorage record for the given DNAnexus sequencing results. Args: dxres: `scgpm_seqresults_dnanexus.dnanexus_utils.du.DxSeqResults()` instance that contains sequencing results metadata in DNAnexus for the given srun. Returns: `dict`. The response from the server containing the JSON serialization of the new DataStorage record. """ logger.debug("In create_data_storage().") payload = {} payload["name"] = dxres.dx_project_name exists = models.DataStorage.find_by(payload=payload) if exists: return exists payload["project_identifier"] = dxres.dx_project_id payload["data_storage_provider_id"] = models.DataStorageProvider("DNAnexus").id # Create DataStorage res_json = models.DataStorage.post(payload) return res_json
[docs]def check_pairedend_correct(sreq, dx_pe_val): """ Checks whether the SequencingRequest.paired_end attribute and the 'paired' property of the DNAnexus project in question are in accordance. It's possible that the request originally went in as SE (or the tech forgot to check PE), but the sequencing run was acutally done PE. If this is the case, then the SequencingRequest.paired_end attribute will be updated to be set to True in order that PE sequencing results will be allowed (PE attributes of a SequencingResult will be hidden in the UI if the SequencingRequest is set to paired_end being false). Args: sreq: A `pulsarpy.models.SequencingRequest` instance. dx_pe_val: `str`. The value of the 'paired' property of the DNAnexus project in questions. """ if sreq.paired_end == False: if dx_pe_val == "true": sreq.patch({"paired_end": True})
[docs]def import_dx_project(dx_project_id): """ Attemps to import DNAnexus sequencing results for the given DNAnexus project ID. This entails having a SequencingRequest object in Pulsar that in turn has a SequecingRun object to import the results into, creating SequencingResult objects in the process. Thus, we must first try to find the appropriate SequencingRequest if it exists. If it doesn't, an Exception will be raised. We first try to find the SequencingRequest by matching the value of its name attribute to the DNAnexus project's library_name property value. Normally, when provinding the sequencing center a name for their library to be sequenced, the lab uses the record ID of the SequencingRequest object, which is the concatenation of the model abbreviation in Pulsar, a hyphen, and the record's primary ID (i.e. SREQ-25). Normally, we could just search by the integer portion on the primary ID field. However, SequencingRequests from the old Syapse LIMS have been backported into Pulsar, and they used the same record ID forming convention there too. So for these records, the Syaspe record ID has been added into a Pulsar SequencingRequest via the name attribute. Thus, as a precaution, a SequencingRequest is first searched on its name attribute. If that fails, then the SequencingRequests are searched on the primary ID attribute using only the interger portion of the DNAnexus project's library_name property. Raises: `pulsarpy.elasticsearch_utils.MultipleHitsException`: Multiple SequencingRequest records were found in searching by name in pulsarpy.models.Model.replace_name_with_id(). `MissingSequencingRequest`: A relevant SequencingRequest record to import the DNAnexus sequencing results into could not be found. `BarcodeNotSet`: A library on the SequencingRequest object at hand does not have a barcode set, making it impossible to import sequening results from DNAnexus for it. `scgpm_seqresults_dnanexus.dnanexus_utils.FastqNotFound`: There aren't any FASTQ files in the DNAnexus project for a given Library, based on the barcode specified for that Library. """ dxres = du.DxSeqResults(dx_project_id=dx_project_id) logger.debug("Preparing to import DNAnexus sequencing results for {} ({}).".format(dx_project_id, dxres.dx_project_name)) # A pulsarpy.models.DxMissingLibraryNameProperty Exception is raised if library_name property # is not present in DNAnexus project. lib_name_prop = dxres.library_name logger.debug("DNAnexus library_name property value: {}.".format(lib_name_prop)) #sreq = models.SequencingRequest.find_by(payload={"name": lib_name_prop}) # Using Elasticsearch here mainly in order to achieve a case-insensitive search on the SequencingRequest # name field. logger.debug("Searching Pulsar for matching SequencingRequest record.") try: # If lib_name_prop is empty, then search below will fail with message of: # 'ValueError: Either the 'uid' or 'upstream' parameter must be set'. sreq = models.SequencingRequest(lib_name_prop) except MultipleHitsException as e: # raised in pulsarpy.models.Model.replace_name_with_id() logger.error("Found multiple SequencingRequest records with name '{}'. Skipping DNAnexus project {} ({}) with library_name property set to '{}'".format(lib_name_prop, t, dxres.name)) raise except models.RecordNotFound as e: # raised in pulsarpy.models.Model.replace_name_with_id() # Search by ID. The lab sometimes doesn't add a value for SequencingRequest.name and # instead uses the SequencingRequest record ID, which is a concatenation of the model # abbreviation, a hyphen, and the records primary ID. try: if not lib_name_prop.lower().startswith("sreq-"): # Don't know what this DNAnexus project is form than; ignore; raise models.RecordNotFound else: sreq = models.SequencingRequest(lib_name_prop.lower().lstrip("sreq-")) except models.RecordNotFound: msg = "Can't find Pulsar SequencingRequest for DNAnexus project {} ({}) with library_name property set to '{}'.".format(dx_project_id, dxres.dx_project_name, lib_name_prop) logger.error(msg) raise MissingSequencingRequest(msg) if "paired_end" in dxres.dx_project_props: check_pairedend_correct(sreq, dxres.dx_project_props["paired_end"]) logger.debug("Found SequencingRequest {}.".format(sreq.id)) srun = get_or_create_srun(sreq, dxres) logger.debug("SequencingRun record is: {}.".format(srun.id)) # Check if DataStorage is aleady linked to SequencingRun object. May be if user created it # manually in the past. if not srun.data_storage_id: ds_json = create_data_storage(dxres) srun.patch({"data_storage_id": ds_json["id"], "status": "finished"}) # Create SequencingResult record for each library on the SReq for library_id in sreq.library_ids: library = models.Library(library_id) barcode = library.get_barcode_sequence() if not barcode: msg = "Library {} does not have a barcode set.".format(library_id) logger.error(msg) raise BarcodeNotSet(msg) import_library(srun_id=srun.id, barcode=barcode, dxres=dxres)
def import_library(srun_id, barcode, dxres): srun = models.SequencingRun(srun_id) sreq = models.SequencingRequest(srun.sequencing_request_id) lib_bcseq_hash = sreq.get_library_barcode_sequence_hash(inverse=True) library_id = lib_bcseq_hash[barcode] library = models.Library(library_id) # Check if SequencingResult record for given library already exists. if library_id in srun.library_sequencing_results(): return payload = {} payload["mapper"] = "bwa" payload["sequencing_run_id"] = srun.id payload["library_id"] = library_id # Find the barcode file on DNAnexus logger.debug("Processing Library {} ({}) with barcode {}.".format(library.name, library_id, barcode)) try: logger.debug("Locating sequencing files for Library {}, barcode {}.".format(library_id, barcode)) barcode_files = dxres.get_fastq_files_props(barcode=barcode) except du.FastqNotFound as e: logger.error(e.args) raise #return # Above - keys are the FASTQ file DXFile objects; values are the dict of associated properties # on DNAnexus on the file. In addition to the properties on the file in DNAnexus, an # additional property is present called 'fastq_file_name'. # Read barcode_stats.json to get mapped read counts for the given barcode: #barcode_stats = dxres.get_barcode_stats_json(barcode=barcode) logger.debug("Download alignment summary metrics for barcode {}.".format(barcode)) #### Get Picard's Alignment summary metrics # dxres.get_alignment_summary_metrics() raises a scgpm_seqresults_dnanexus.dnanexus_utils.DxMissingAlignmentSummaryMetrics # exception if a Picard alignment summary metrics file couldn't be found. try: asm = dxres.get_alignment_summary_metrics(barcode=barcode) except du.DxMissingAlignmentSummaryMetrics: # GSSC doesn't do any analysis for NovaSeq runs. asm = None for dxfile in barcode_files: file_id = dxfile.id props = barcode_files[dxfile] read_num = props.get("read", None) if read_num: read_num = int(read_num) else: fastq_file_name = props["fastq_file_name"] if "_R1" in fastq_file_name: read_num = 1 elif "_R2" in fastq_file_name: read_num = 2 if not read_num in [1, 2]: raise Exception("Unknown read number '{}'. Should be either 1 or 2.".format(read_num)) if read_num == 1: payload["read1_uri"] = file_id else: payload["read2_uri"] = file_id if asm: if sreq.paired_end: payload["pair_aligned_perc"] = round(float(asm["PAIR"]["PCT_READS_ALIGNED_IN_PAIRS"]) * 100, 2) if read_num == 1: metrics = asm["FIRST_OF_PAIR"] payload["read1_count"] = metrics["PF_READS"] payload["read1_aligned_perc"] = round(float(metrics["PCT_PF_READS_ALIGNED"]) * 100, 2) else: metrics = asm["SECOND_OF_PAIR"] payload["read2_count"] = metrics["PF_READS"] payload["read2_aligned_perc"] = round(float(metrics["PCT_PF_READS_ALIGNED"]) * 100, 2) models.SequencingResult.post(payload)