In [1]:
%%capture
# load the magic extension and imports
%reload_ext nextcode
import pandas as pd
import GOR_query_helper as GQH
%env LOG_QUERY=1

project = "bch_connect_hg38"
%env GOR_API_PROJECT={project}

Parallel GORpipe queries with PARTGOR¶

This notebook explains advanced parallel concepts in GOR, beyond the standard PGOR macro command used to split up data processing across the genome. GOR files/tables are always consistently ordered to enable rapid data access based on genomic position. Additionally, GOR allows data tables to be partitioned, to provide fast access to subset of data based on tags (keys), using so called GOR-dictionaries. Such partitions are on two levels, based on individual tags or based on buckets storing data for multiple tags. This data architecture allow for very large tables that can be updated efficiently and queries efficiently with parallel processing. An example of such tables can be variants or sequence read coverage on patient sample level or GWAS analysis partitioned based on phenotypes.

Similar to PGOR, PARTGOR is a macro command that makes it easy to setup parallel queries that recognize the partition setup of dictionaries. The following examples will explore this.

Variant count¶

First, we use it to get information of the variants from all the samples in a single gene.

In [2]:
qh = GQH.GOR_Query_Helper()
mydefs = qh.add_many("""
def ##wgsvars## = source/var/wgs_varcalls.gord -s PN;
def ##segcov## = source/cov/segment_cov.gord -s PN;
""")

We can explore the metadata in ##wgsvars## by using the -asdict option with the NOR command:

In [3]:
%%gor
$mydefs
nor -asdict ##wgsvars## | colsplit #1 2 x -s '\|' | group -gc x_2 -count | granno -sum -ic allcount -count
Query ran in 1.16 sec
Query fetched 152 rows in 0.58 sec (total time 1.73 sec)
                                             
Out[3]:
x_2 allCount allCountx sum_allCount
0 NaN 5 152 24099
1 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 200 152 24099
2 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 200 152 24099
3 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 200 152 24099
4 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 200 152 24099
... ... ... ... ...
147 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 100 152 24099
148 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 100 152 24099
149 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 20 152 24099
150 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 54 152 24099
151 .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... 20 152 24099

152 rows × 4 columns

We see that there are 23163 samples imported, 141 buckets, most of which have 200 samples.

Now we create a temp table with all the genes of interest and join them with the variants:

In [4]:
mydefs = qh.add("""create #mygenes# = gor #genes# | where gene_symbol in ('BRCA2')""")
In [5]:
%%gor
$mydefs
gor [#mygenes#] | join -segsnp -ir <(gor ##wgsvars##)
| group 1 -gc reference,call,callcopies -count
Query ran in 6.17 sec.
Query fetched 5,052 rows in 36.94 sec (total time 43.11 sec)
                                             
Out[5]:
CHROM POS Reference Call CallCopies allCount
0 chr13 32315226 G A 1 461
1 chr13 32315226 G A 2 54
2 chr13 32315300 G A 1 9
3 chr13 32315336 T C 1 1
4 chr13 32315382 A T 1 1
... ... ... ... ... ... ...
5047 chr13 32400114 G A 1 3
5048 chr13 32400151 T A 1 618
5049 chr13 32400151 T A 2 826
5050 chr13 32400200 A G 1 1
5051 chr13 32400233 C T 1 1

5052 rows × 6 columns

While the above query, that processed all the variants of 23k samples in BRCA2 ran pretty quickly, we execute it faster with parallel processing. Doing so by using multiple small regions over the BRCA2 will actually create significant overhead for each worker because of the fixed seek time for each file (here 141 buckets because we are not filtering on samples). Rather, we will paralellize over the sample partitions using PARTGOR.

In [6]:
mydefs = qh.add("""
create #partcount# = partgor -dict ##wgsvars##
<(gor [#mygenes#] | join -segsnp -ir <(gor ##wgsvars## -f #{tags})
| group 1 -gc reference,call,callcopies -count);
""")
In [7]:
%%gor
$mydefs
gor [#partcount#] | granno 1 -gc #3,#4 -count | top 200
Query ran in 4.56 sec.
Query fetched 200 rows in 4.06 sec (total time 8.62 sec)
                                             
Out[7]:
CHROM POS Reference Call CallCopies allCount allCountx
0 chr13 32315226 G A 1 4 121
1 chr13 32315226 G A 1 5 121
2 chr13 32315226 G A 1 2 121
3 chr13 32315226 G A 1 4 121
4 chr13 32315226 G A 2 2 121
... ... ... ... ... ... ... ...
195 chr13 32315655 A G 1 1 117
196 chr13 32315655 A G 1 6 117
197 chr13 32315655 A G 2 2 117
198 chr13 32315655 A G 1 4 117
199 chr13 32315655 A G 2 1 117

200 rows × 7 columns

Tip: For efficient parallel processing, it is important not to use too many partitions. By default, PARTGOR will use the bucket sizes in the dictionary specified with -dict to determine the partition size. One can modify the default with the options -partscale, -parts, or -partsize. The the underlying buckets in the dictionary will still by used to optimize which tags are grouped into each partition, as specified with the #{tags} variable in the macro-command.
Alert: The command executed by the PARTGOR macro is specified with a nested query <(...) and must include a #{tag} variable, because otherwise there will be no filtering and multiple identical queries being exectuted.

We see that we get multiple rows for each variant, with sample counts from each partition for the common variants. Therefore, we need to sum the sums because GOR does not automatically handle distributvie aggregates. We will now change the partition size sum up:

In [8]:
mydefs = qh.add("""
create #partcount# = partgor -dict ##wgsvars## -partsize 1000
<(gor [#mygenes#] | join -segsnp -ir <(gor ##wgsvars## -f #{tags})
| group 1 -gc reference,call,callcopies -count);
""")
In [9]:
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
Query ran in 1.56 sec
Query fetched 5,052 rows in 1.18 sec (total time 2.73 sec)
                                             
Out[9]:
CHROM POS Reference Call CallCopies gtCount
0 chr13 32315226 G A 1 461
1 chr13 32315226 G A 2 54
2 chr13 32315300 G A 1 9
3 chr13 32315336 T C 1 1
4 chr13 32315382 A T 1 1
... ... ... ... ... ... ...
5047 chr13 32400114 G A 1 3
5048 chr13 32400151 T A 1 618
5049 chr13 32400151 T A 2 826
5050 chr13 32400200 A G 1 1
5051 chr13 32400233 C T 1 1

5052 rows × 6 columns

Accessing more than one table¶

Now we will extend the query above and use sequence read coverage from each sample too understand the quality where variant may be absent in a sample. We will use the allvars.gord table in the VEP folder to list all possible variants within the gene. Then we will left-join the variant data on sample level and the coverage data.

In [10]:
mydefs = qh.add_many("""
def ##allvars## = source/anno/vep_v107/allvars.gord;
create #partcount# = partgor -dict ##wgsvars## -partsize 1000
<(gor [#mygenes#] | join -segsnp -ir <(gor ##allvars##)
| calc pn '#{tags}' | split pn
| varjoin -r -l -xl PN -xr PN -e 'miss' <(gor ##wgsvars## -f #{tags})
| group 1 -gc reference,call,callcopies -count
);
""")
In [11]:
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
    
Query ran in 2.55 sec
Query fetched 22,744 rows in 40.65 sec (total time 43.20 sec)
                                             
Out[11]:
CHROM POS Reference Call CallCopies gtCount
0 chr13 32315226 G A 1 461
1 chr13 32315226 G A 2 54
2 chr13 32315226 G A miss 23584
3 chr13 32315300 G A 1 9
4 chr13 32315300 G A miss 24090
... ... ... ... ... ... ...
22739 chr13 32400151 T A miss 22655
22740 chr13 32400200 A G 1 1
22741 chr13 32400200 A G miss 24098
22742 chr13 32400233 C T 1 1
22743 chr13 32400233 C T miss 24098

22744 rows × 6 columns

Note: Here we are using #{tags} twice, first to generate one row per sample for each variant in order for the left-join logic to be applied on sample level. Secondly, to filter the dictionary table ##wgsvars## such that only data from the necessary partitions is read. Replacement of #{tags} is greedy, even though is is placed within quotes.

We see that for most of the variants, the callcopies count indicates that they are missing. The fact that the variants in ##wgsvars## are sparse, we have to use the sequence read coverage to separate homozygous reference state from absense of variant due to lack of data. Therefore, we join the coverage information for each sample:

In [12]:
mydefs = qh.add("""
def ##allvars## = source/anno/vep_v107/allvars.gord;
create #partcount# = partgor -dict ##wgsvars## -partsize 1000
<(gor [#mygenes#] | join -segsnp -ir <(gor ##allvars##)
| calc pn '#{tags}' | split pn
| varjoin -r -l -xl PN -xr PN -e 'miss' <(gor ##wgsvars## -f #{tags})
| join -snpseg -maxseg 10000 -l -e 0 -xl PN -xr PN -rprefix Cov <(gor ##segcov## -f #{tags})
| replace callcopies if(callcopies!='miss',callcopies,if(Cov_depth > 5,'0','3'))
| group 1 -gc reference,call,callcopies -count
);
""")
In [13]:
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
    
Query ran in 5.70 sec.
Query fetched 40,428 rows in 97.91 sec (total time 103.60 sec)
                                             
Out[13]:
CHROM POS Reference Call CallCopies gtCount
0 chr13 32315226 G A 0 1245
1 chr13 32315226 G A 1 461
2 chr13 32315226 G A 2 54
3 chr13 32315226 G A 3 22339
4 chr13 32315300 G A 0 1790
... ... ... ... ... ... ...
40423 chr13 32400200 A G 1 1
40424 chr13 32400200 A G 3 22429
40425 chr13 32400233 C T 0 1657
40426 chr13 32400233 C T 1 1
40427 chr13 32400233 C T 3 22441

40428 rows × 6 columns

Note: In the above query, we are using #{tags} three times, twice to filter tables. Because the PARTGOR -dict option refers to ##wgsvars##, it is used to guide how the tags are generated. Assuming that ##segcov## is populated in concert with ##wgsvars##, its bucket structure should be identical, however, it is not necessary although it will lead to worse performance. Also, rather than using the -f option to filter, it is possible to use -ff and queries that fetch samples outside of the partition, e.g. to get all members needed for inheritance check who may be stored in different partitions.

The query above is using first-principles method to perform a simplistic "joint-genotyping". GOR does have a much more efficient commands to do this, namely GTGEN and PRGTGEN. However, this approach is still reasonably fast as shown here for over 20k samples and 17k variants. We can for instance calculate the allele frequence like this:

In [14]:
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
| pivot callcopies -v '0','1','2','3' -gc #3,#4 -e 0
| prefix call[+1]- x
| calc AN round(2.0*(float(x_0_gtcount)+float(x_1_gtcount)+float(x_2_gtcount)))
| calc AC x_1_gtcount+2*x_2_gtcount
| calc AF form(AC/float(AN),6,5)
| hide x_*
| granno chrom -count
| where AN > 40000
Query ran in 0.78 sec
Query fetched 14,684 rows in 1.40 sec (total time 2.18 sec)
                                             
Out[14]:
CHROM POS Reference Call AN AC AF allCount
0 chr13 32316333 CAA C 43358 1 0.00002 17684
1 chr13 32316345 C A 44856 2 0.00004 17684
2 chr13 32316359 A G 46436 1 0.00002 17684
3 chr13 32316360 C A 46560 1 0.00002 17684
4 chr13 32316361 C A 46624 1 0.00002 17684
... ... ... ... ... ... ... ... ...
14679 chr13 32398918 G A 42656 2 0.00005 17684
14680 chr13 32398926 C A 41886 1 0.00002 17684
14681 chr13 32398926 C T 41886 4 0.00010 17684
14682 chr13 32398927 G A 41670 2 0.00005 17684
14683 chr13 32398933 C T 40768 1 0.00002 17684

14684 rows × 8 columns

Note: In the above queries, we are joining two tables based on spatial overlap of variants and segments as well as using equi-join on the sample ids, PN. To do that efficiently, GOR uses merge-hash logic. By using PARGOR and split size of 1000, the memory requirements more than 20x smaller than if we would not use partition parallelization and work with all 23k samples in a single partition. .

Working with a subset of samples (tags)¶

Before we end this demonstration, we will show how we can calculate the AF for a subset of the samples, e.g. based on HPO phenotypes.

In [15]:
%%gor dfSamples <<
nor SubjectReports/Phenotypes.rep
| where description in ('Abnormality of head or neck',
                        'Abnormality of the face',
                        'Abnormality of the head') and present='Y'
| group -gc pn -set -sc code
| where listsize(set_code)=3
| select pn
Query ran in 9.16 sec.
Query fetched 2,042 rows in 2.69 sec (total time 11.85 sec)
                                             
In [16]:
dfSamples.shape
Out[16]:
(2042, 1)

We see that we have just over 2000 samples with all three HPO phenotypes present. Below, we define ##myPNs## using a federated approach, i.e. as the Pandas-dataframe dfSamples.

In [17]:
mydefs = qh.add_many("""
create ##myPNs## = nor [var:dfSamples];
def ##allvars## = source/anno/vep_v107/allvars.gord;
create #partcount# = partgor -dict ##wgsvars## -partscale 2 -ff [##myPNs##]
<(gor [#mygenes#] | join -segsnp -ir <(gor ##allvars##)
| calc pn '#{tags}' | split pn
| varjoin -r -l -xl PN -xr PN -e 'miss' <(gor ##wgsvars## -f #{tags})
| join -snpseg -maxseg 10000 -l -e 0 -xl PN -xr PN -rprefix Cov <(gor ##segcov## -f #{tags})
| replace callcopies if(callcopies!='miss',callcopies,if(Cov_depth > 5,'0','3'))
| group 1 -gc reference,call,callcopies -count
);
""")
In [18]:
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
| pivot callcopies -v '0','1','2','3' -gc #3,#4 -e 0
| prefix call[+1]- x
| calc AN round(2.0*(float(x_0_gtcount)+float(x_1_gtcount)+float(x_2_gtcount)))
| calc AC x_1_gtcount+2*x_2_gtcount
| calc AF form(AC/float(AN),6,5)
| hide x_*
| granno chrom -count
| where AN > 4000
Query ran in 0.07 sec
Loading relations [var:dfSamples] from local state
Query ran in 0.64 sec
Query fetched 14,386 rows in 22.01 sec (total time 22.65 sec)
                                             
Out[18]:
CHROM POS Reference Call AN AC AF allCount
0 chr13 32316367 C T 4008 0 0.00000 17684
1 chr13 32316370 A G 4026 1 0.00025 17684
2 chr13 32316386 C G 4070 0 0.00000 17684
3 chr13 32316388 C T 4070 0 0.00000 17684
4 chr13 32316393 T G 4074 0 0.00000 17684
... ... ... ... ... ... ... ... ...
14381 chr13 32398878 A G 4078 0 0.00000 17684
14382 chr13 32398900 T A 4038 0 0.00000 17684
14383 chr13 32398901 C T 4036 0 0.00000 17684
14384 chr13 32398904 C T 4018 0 0.00000 17684
14385 chr13 32398905 G A 4016 1 0.00025 17684

14386 rows × 8 columns

Mixing together PARTGOR and PGOR¶

We can apply parallelism in two dimensions simultaneously, i.e. along the partition axis with PARTGOR and along the genomic axis with PGOR. To do this, we apply PGOR within the PARGOR macro command. A practical case might be if we want to calculate the number of variants in a projects:

In [26]:
qh2 = GQH.GOR_Query_Helper()
mydefs2 = qh2.add_many("""
def #wgsvars# = source/var/wgs_varcalls.gord -s PN;
create #temp# = partgor -dict #wgsvars# -partscale 10 <(pgor #wgsvars# -f #{tags} | group chrom -gc pn -count);
""")
In [27]:
%%gor
$mydefs2
nor -asdict [#temp#] | calc file_ending right(#1,40)
Query ran in 5.11 sec.
Query fetched 13 rows in 575.11 sec (total time 580.22 sec)
                                             
Out[27]:
col1 col2 file_ending
0 ../../../../cache/result_cache/cache01/eak/eak... 1 e01/eak/eak5njnebc4d9f0ggefgoc1kbgk.gord
1 ../../../../cache/result_cache/cache01/138/138... 6 e01/138/1389320co9icjdh6od3domcd37c.gord
2 ../../../../cache/result_cache/cache01/d3i/d3i... 9 e01/d3i/d3ihdl3e80akj3gj5b5cl2n3b5e.gord
3 ../../../../cache/result_cache/cache01/1p0/1p0... 11 e01/1p0/1p01a31i559nn23a4cm0d05e62a.gord
4 ../../../../cache/result_cache/cache01/d38/d38... 3 e01/d38/d38fi2m4n3737ol9j61hjib2ol9.gord
5 ../../../../cache/result_cache/cache01/jkb/jkb... 7 e01/jkb/jkba4ebgabfmnhmghi1fp70fido.gord
6 ../../../../cache/result_cache/cache01/78f/78f... 2 e01/78f/78f0np9ed5o8hf16jdm56o2enao.gord
7 ../../../../cache/result_cache/cache01/104/104... 10 01/104/104aclcl3oom29o58mhl9ka76hl2.gord
8 ../../../../cache/result_cache/cache01/kgc/kgc... 5 e01/kgc/kgcdmdhnke4ailg3fa2eo66c8ic.gord
9 ../../../../cache/result_cache/cache01/9lf/9lf... 13 e01/9lf/9lf3dk95j8nfal8pha7fn4mb53e.gord
10 ../../../../cache/result_cache/cache01/6ho/6ho... 4 e01/6ho/6ho41hdk6f6hd8a6dl1i3ncdomd.gord
11 ../../../../cache/result_cache/cache01/10h/10h... 8 01/10h/10hn6nakh9anm4ohp1el52ga9ko8.gord
12 ../../../../cache/result_cache/cache01/g71/g71... 12 e01/g71/g71k5e7la2mgflg550a7b634j8e.gord

We see that the temp dictionary #temp# is indeed with multiple lines of dictionaries, each of which represents the output from PGOR parallel executions. Using -partscale of 10, the above query generated over 300 parallel queries to count the variants per PN across the genome. A -partscale of 1 would have resulted in 3000 queries and the same number of temporary files. With the longest partitions taking about 5 minutes to run, the overall query time was about 10minutes with 100 workers. We can now sum up the total counts:

In [39]:
%%gor
$mydefs2
nor [#temp#] | group -gc pn -sum -ic allcount | granno -sum -avg -min -max -ic sum_allcount 
| replace avg_sum_allCount int(float(#rc))
Query ran in 1.02 sec
Query fetched 24,099 rows in 1.27 sec (total time 2.29 sec)
                                             
Out[39]:
PN sum_allCount min_sum_allCount max_sum_allCount avg_sum_allCount sum_sum_allCount
0 BCH_00HI1377A_1_38E_BC38 93044 233 6339576 455953 10988032042
1 BCH_019-III-01-P2_1_38E_BC38 132944 233 6339576 455953 10988032042
2 BCH_01HI1915A_1_38E_BC38 97958 233 6339576 455953 10988032042
3 BCH_01HI2033A_1_38E_BC38 97677 233 6339576 455953 10988032042
4 BCH_01HI2317A_1_38E_BC38 99034 233 6339576 455953 10988032042
... ... ... ... ... ... ...
24094 BCH_WWS902A_1_38E_BC38 91661 233 6339576 455953 10988032042
24095 BCH_WWS904A_1_38E_BC38 93378 233 6339576 455953 10988032042
24096 BCH_WWS9101X_1_38E_BC38 106516 233 6339576 455953 10988032042
24097 BCH_WWS9104X_1_38E_BC38 105853 233 6339576 455953 10988032042
24098 BCH_WWS9902X_1_38E_BC38 108602 233 6339576 455953 10988032042

24099 rows × 6 columns

We see that there are close to 11billion variants - about 455k on average per sample.

In [ ]: