diff --git a/VERSION b/VERSION index cdb98d26e..76e9e619d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.1.3 +5.1.4 diff --git a/base/src/main/java/org/gorpipe/base/config/converters/ByteSizeConverter.java b/base/src/main/java/org/gorpipe/base/config/converters/ByteSizeConverter.java index 3dac7c053..5a9b324b6 100644 --- a/base/src/main/java/org/gorpipe/base/config/converters/ByteSizeConverter.java +++ b/base/src/main/java/org/gorpipe/base/config/converters/ByteSizeConverter.java @@ -36,7 +36,7 @@ public ByteSize convert(Method method, String input) { return parse(input); } - private static ByteSize parse(String input) { + public static ByteSize parse(String input) { String[] parts = ConverterUtil.splitNumericAndChar(input); String value = parts[0]; String unit = parts[1]; diff --git a/buildSrc/src/main/groovy/gor.java-common.gradle b/buildSrc/src/main/groovy/gor.java-common.gradle index a5467992a..c72fcdc48 100644 --- a/buildSrc/src/main/groovy/gor.java-common.gradle +++ b/buildSrc/src/main/groovy/gor.java-common.gradle @@ -58,17 +58,17 @@ repositories { excludeGroupByRegex "com\\.(wuxi)?nextcode.*" } } - maven { - name "gitlab-maven" - url uri("https://gitlab.com/api/v4/groups/2443025/-/packages/maven") - credentials(HttpHeaderCredentials) { - name = project.findProperty('gitlab_token') ? "Private-Token" : "Job-Token" - value = project.findProperty('gitlab_token') ?: System.getenv("CI_JOB_TOKEN") - } - authentication { - header(HttpHeaderAuthentication) - } - } +// maven { +// name "gitlab-maven" +// url uri("https://gitlab.com/api/v4/groups/2443025/-/packages/maven") +// credentials(HttpHeaderCredentials) { +// name = project.findProperty('gitlab_token') ? "Private-Token" : "Job-Token" +// value = project.findProperty('gitlab_token') ?: System.getenv("CI_JOB_TOKEN") +// } +// authentication { +// header(HttpHeaderAuthentication) +// } +// } } java { diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3SourceProvider.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3SourceProvider.java index 0fe631840..463066d13 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3SourceProvider.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3SourceProvider.java @@ -62,6 +62,7 @@ public class S3SourceProvider extends StreamSourceProvider { private static final boolean USE_CRT_CLIENT = Boolean.parseBoolean(System.getProperty("gor.s3.client.crt", "true")); private static final boolean USE_ASYNC_CLIENT = Boolean.parseBoolean(System.getProperty("gor.s3.client.async", "false")); + private static final boolean FORCE_PATH_STYLE = Boolean.parseBoolean(System.getProperty("gor.s3.forcePathStyle", "false")); private final CredentialClientCache clientCredCache = new CredentialClientCache<>(S3SourceType.S3.getName(), this::createClient); private final CredentialClientCache asyncClientCredCache = new CredentialClientCache<>(S3SourceType.S3.getName(), this::createAsyncClient); @@ -110,17 +111,17 @@ protected S3Client getClient(String securityContext, String bucket) throws IOExc private S3Client createClient(Credentials cred) { if (USE_CRT_CLIENT) { - return createCrtClient(cred); + return createSyncCrtClient(cred); } return createApacheClient(cred); } - private S3Client createCrtClient(Credentials cred) { + private S3Client createSyncCrtClient(Credentials cred) { var builder = S3Client.builder(); AwsCrtHttpClient.Builder httpClientBuilder = AwsCrtHttpClient.builder() .connectionTimeout(s3Config.connectionTimeout()) // Default was 2s - .maxConcurrency(s3Config.connectionPoolSize()) // Default was 50 + .maxConcurrency(s3Config.connectionPoolSize()) .tcpKeepAliveConfiguration(b -> b .keepAliveInterval(Duration.ofMillis(s3Config.socketTimeout().toMillis()/2)) .keepAliveTimeout(s3Config.connectionTimeout())) @@ -231,8 +232,13 @@ private S3AsyncClient createAsyncCrtClient(Credentials cred) { builder.httpConfiguration(httpConfigBuilder.build()); + // See: https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java builder.maxConcurrency(s3Config.connectionPoolSize()); //builder.accelerate(true) + //builder.targetThroughputInGbps(5.0); + //builder.maxNativeMemoryLimitInBytes(1L * 1024 * 1024 * 1024); + //builder.minimumPartSizeInBytes(5L * 1024 * 1024); + //builder.initialReadBufferSizeInBytes(1L * 1024 * 1024); var endpoint = getEndpoint(cred); if (!StringUtil.isEmpty(endpoint)) { @@ -280,7 +286,7 @@ private void applyBaseClientConfig(S3BaseClientBuilder builder, Credential builder.overrideConfiguration(c -> c.scheduledExecutorService(scheduledExecutorService)); // OCI compat layer needs path style access. - if (isOciEndpoint(endpoint)) { + if (isOciEndpoint(endpoint) || FORCE_PATH_STYLE) { builder.forcePathStyle(true); } diff --git a/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCIBvlTestSuite.java b/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCIBvlTestSuite.java index ede16c18f..a9b8cfc5f 100644 --- a/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCIBvlTestSuite.java +++ b/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCIBvlTestSuite.java @@ -13,7 +13,7 @@ @Category(IntegrationTests.class) public class ITestOCIBvlTestSuite extends BvlTestSuite { - private String bucketName = "gdb_gor_test_data_dev"; + private String bucketName = "gdb-gor-test-data-dev"; private static String OCI_TENANT; private static String OCI_USER; diff --git a/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCICommonFilesTests.java b/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCICommonFilesTests.java index 7b6410dc9..dfc2290b3 100644 --- a/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCICommonFilesTests.java +++ b/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCICommonFilesTests.java @@ -19,7 +19,7 @@ @Category(IntegrationTests.class) public class ITestOCICommonFilesTests extends CommonFilesTests { - private String bucketName = "gdb_gor_test_data_dev"; + private String bucketName = "gdb-gor-test-data-dev"; private static String OCI_TENANT; private static String OCI_USER; diff --git a/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCICommonStreamTests.java b/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCICommonStreamTests.java index 34246a276..d1db45d3c 100644 --- a/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCICommonStreamTests.java +++ b/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCICommonStreamTests.java @@ -26,7 +26,7 @@ @Category(IntegrationTests.class) public class ITestOCICommonStreamTests extends CommonStreamTests { - private String bucketName = "gdb_gor_test_data_dev"; + private String bucketName = "gdb-gor-test-data-dev"; private static String OCI_TENANT; private static String OCI_USER; diff --git a/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCISource.java b/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCISource.java index 390df8920..e0bad7529 100644 --- a/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCISource.java +++ b/drivers/src/test/java/org/gorpipe/oci/driver/ITestOCISource.java @@ -25,7 +25,7 @@ @Category(IntegrationTests.class) public class ITestOCISource { - private String bucketName = "gdb_gor_test_data_dev"; + private String bucketName = "gdb-gor-test-data-dev"; private static String OCI_TENANT; private static String OCI_USER; diff --git a/drivers/src/test/java/org/gorpipe/oci/driver/UTestOCIObjectStorageSourceType.java b/drivers/src/test/java/org/gorpipe/oci/driver/UTestOCIObjectStorageSourceType.java index 1fc9932bd..e56a4a635 100644 --- a/drivers/src/test/java/org/gorpipe/oci/driver/UTestOCIObjectStorageSourceType.java +++ b/drivers/src/test/java/org/gorpipe/oci/driver/UTestOCIObjectStorageSourceType.java @@ -42,29 +42,29 @@ static public void setUpClass() { @Test public void testResolveGorOciUrl() throws IOException { var source = GorDriverFactory.fromConfig().resolveDataSource( - new SourceReference("oci://gdb_gor_test_data_dev/the/path.dat", securityContext(), null, null, null, false)); + new SourceReference("oci://gdb-gor-test-data-dev/the/path.dat", securityContext(), null, null, null, false)); assertEquals(OCIObjectStorageSourceType.OCI_OBJECT_STORAGE, source.getSourceType()); source = GorDriverFactory.fromConfig().resolveDataSource( - new SourceReference("oc://gdb_gor_test_data_dev/the/path.dat", securityContext(), null, null, null, false)); + new SourceReference("oc://gdb-gor-test-data-dev/the/path.dat", securityContext(), null, null, null, false)); assertEquals(OCIObjectStorageSourceType.OCI_OBJECT_STORAGE, source.getSourceType()); } @Test public void testResolveOciHttpUrl() throws IOException { var source = GorDriverFactory.fromConfig().resolveDataSource( - new SourceReference("https://namespace.objectstorage.us-ashburn-1.oci.customer-oci.com/gdb_gor_test_data_dev/the/path.dat", securityContext(), null, null, null, false)); + new SourceReference("https://namespace.objectstorage.us-ashburn-1.oci.customer-oci.com/gdb-gor-test-data-dev/the/path.dat", securityContext(), null, null, null, false)); assertEquals(OCIObjectStorageSourceType.OCI_OBJECT_STORAGE, source.getSourceType()); source = GorDriverFactory.fromConfig().resolveDataSource( - new SourceReference("http://namespace.objectstorage.us-ashburn-1.oci.customer-oci.com/gdb_gor_test_data_dev/the/path.dat", securityContext(), null, null, null, false)); + new SourceReference("http://namespace.objectstorage.us-ashburn-1.oci.customer-oci.com/gdb-gor-test-data-dev/the/path.dat", securityContext(), null, null, null, false)); assertEquals(OCIObjectStorageSourceType.OCI_OBJECT_STORAGE, source.getSourceType()); } @Test public void testResolveOciNativeHttpUrl() throws IOException { var source = GorDriverFactory.fromConfig().resolveDataSource( - new SourceReference("https://namespace.objectstorage.us-ashburn-1.oci.customer-oci.com/n/namespace/b/gdb_gor_test_data_dev/o/the/path.dat", securityContext(), null, null, null, false)); + new SourceReference("https://namespace.objectstorage.us-ashburn-1.oci.customer-oci.com/n/namespace/b/gdb-gor-test-data-dev/o/the/path.dat", securityContext(), null, null, null, false)); assertEquals(OCIObjectStorageSourceType.OCI_OBJECT_STORAGE, source.getSourceType()); } @@ -82,11 +82,11 @@ public void testResolveNonOciHttpUrl() throws IOException { @Test public void testResolveS3HttpUrl() throws IOException { var source = GorDriverFactory.fromConfig().resolveDataSource( - new SourceReference("https://s3.us-east1.amazonaws.com/gdb_gor_test_data_dev/the/path.dat", securityContext(), null, null, null, false)); + new SourceReference("https://s3.us-east1.amazonaws.com/gdb-gor-test-data-dev/the/path.dat", securityContext(), null, null, null, false)); assertNotEquals(OCIObjectStorageSourceType.OCI_OBJECT_STORAGE, source.getSourceType()); source = GorDriverFactory.fromConfig().resolveDataSource( - new SourceReference("http://s3.us-east1.amazonaws.com/gdb_gor_test_data_dev/the/path.dat", securityContext(), null, null, null, false)); + new SourceReference("http://s3.us-east1.amazonaws.com/gdb-gor-test-data-dev/the/path.dat", securityContext(), null, null, null, false)); assertNotEquals(OCIObjectStorageSourceType.OCI_OBJECT_STORAGE, source.getSourceType()); } diff --git a/drivers/src/testFixtures/java/org/gorpipe/utils/DriverUtils.java b/drivers/src/testFixtures/java/org/gorpipe/utils/DriverUtils.java index 9eaf67cb4..d8f29b875 100644 --- a/drivers/src/testFixtures/java/org/gorpipe/utils/DriverUtils.java +++ b/drivers/src/testFixtures/java/org/gorpipe/utils/DriverUtils.java @@ -86,7 +86,7 @@ public static String awsSecurityContext(String key, String secret) { public static String ociSecurityContext(String tenant, String user, String secret, String fingerprint) { // Credentials for gor_unittest user in nextcode OCI account Credentials cred = new Credentials.Builder().service("oci") - .lookupKey("gdb_gor_test_data_dev") + .lookupKey("gdb-gor-test-data-dev") .set(Credentials.Attr.API_ENDPOINT, DEFAULT_OCI_ENDPOINT) .set(Credentials.Attr.REALM, tenant) .set(Credentials.Attr.KEY, fingerprint) diff --git a/gorscripts/src/main/java/org/gorpipe/gor/cli/GorConfigDoc.java b/gorscripts/src/main/java/org/gorpipe/gor/cli/GorConfigDoc.java index 4f5bba923..49954b7e0 100644 --- a/gorscripts/src/main/java/org/gorpipe/gor/cli/GorConfigDoc.java +++ b/gorscripts/src/main/java/org/gorpipe/gor/cli/GorConfigDoc.java @@ -122,7 +122,7 @@ public String toString() { try (Formatter formatter = new Formatter()) { for (String[] row : rows) { - formatter.format(format, row); + formatter.format(format, (Object)row); } return header + "\n" + formatter.out().toString(); diff --git a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala index 6bbfe68e8..5edb9cb58 100644 --- a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala +++ b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala @@ -22,7 +22,6 @@ package gorsat.Analysis -import java.nio.file.{Files, Path, Paths, StandardOpenOption} import java.util.zip.Deflater import gorsat.Commands.{Analysis, Output, RowHeader} import gorsat.Outputs.OutFile @@ -158,8 +157,9 @@ case class ForkWrite(forkCol: Int, * @return */ def createOutFile(name: String, skipHeader: Boolean): Output = { - if (rowHeader==null || useFork) OutFile.driver(name, session.getProjectContext.getFileReader, header, skipHeader, options) - else { + if (rowHeader == null || useFork) { + OutFile.driver(name, session.getProjectContext.getFileReader, header, skipHeader, options) + } else { if (!rowHeader.toString.equals(header)) { rowHeader = RowHeader(header, rowHeader.columnTypes) } @@ -284,6 +284,15 @@ case class ForkWrite(forkCol: Int, } } + // Available after finish. + def getMd5: String = { + if (!useFork && singleFileHolder.out != null) { + singleFileHolder.out.getMeta.getMd5 + } else { + "" + } + } + private def extractLink(fileName: String) : (String,String) = { var linkFile = options.linkFile var linkFileContent = "" diff --git a/gortools/src/main/scala/gorsat/Analysis/OrderedMapAnalysis.scala b/gortools/src/main/scala/gorsat/Analysis/OrderedMapAnalysis.scala index 8bca6bb97..e65f5cd9a 100644 --- a/gortools/src/main/scala/gorsat/Analysis/OrderedMapAnalysis.scala +++ b/gortools/src/main/scala/gorsat/Analysis/OrderedMapAnalysis.scala @@ -161,7 +161,7 @@ case class OrderedMapAnalysis(session: GorSession, } private def validateKeyOrder(r: Row, key: String, prevKey: String, source: String): Unit = { - if (prevKey != null && !prevKey.isEmpty && prevKey > key) { + if (prevKey != null && !prevKey.isEmpty && prevKey.compareTo(key) > 0) { throw new GorDataException( String.format("%s source is not ordered, as required if the -ordered options is used. " + "Row '%s' is out of order. Key/Prevkey was '%s'/'%s'.", diff --git a/gortools/src/main/scala/gorsat/Analysis/RegressionAnalysis.scala b/gortools/src/main/scala/gorsat/Analysis/RegressionAnalysis.scala index b3ee64fa8..dc09aaff6 100644 --- a/gortools/src/main/scala/gorsat/Analysis/RegressionAnalysis.scala +++ b/gortools/src/main/scala/gorsat/Analysis/RegressionAnalysis.scala @@ -106,7 +106,7 @@ abstract class RegressionAnalysis[T: Manifest](lookUpSignature: String, session: var vIdx = 0 while (tfIdx < len) { val nextIdx = values.indexOf(sep, vIdx) - val cand = if (nextIdx > 0) values.substring(vIdx, nextIdx) else values.substring(vIdx) + val cand = if (nextIdx >= 0) values.substring(vIdx, nextIdx) else values.substring(vIdx) if (cand == "") { filter(tfIdx) = false } else { diff --git a/gortools/src/main/scala/gorsat/Monitors/CancelMonitor.scala b/gortools/src/main/scala/gorsat/Monitors/CancelMonitor.scala index e0d7ef799..57936b2ad 100644 --- a/gortools/src/main/scala/gorsat/Monitors/CancelMonitor.scala +++ b/gortools/src/main/scala/gorsat/Monitors/CancelMonitor.scala @@ -27,6 +27,8 @@ import org.gorpipe.gor.model.Row import org.gorpipe.gor.monitor.GorMonitor case class CancelMonitor(gm : GorMonitor) extends Analysis { + override def isTypeInformationMaintained: Boolean = true + override def process(r : Row): Unit = { if (gm.isCancelled()) { reportWantsNoMore() diff --git a/gortools/src/main/scala/gorsat/Monitors/MemoryMonitor.scala b/gortools/src/main/scala/gorsat/Monitors/MemoryMonitor.scala index 7dc70e983..8ba375a6b 100644 --- a/gortools/src/main/scala/gorsat/Monitors/MemoryMonitor.scala +++ b/gortools/src/main/scala/gorsat/Monitors/MemoryMonitor.scala @@ -35,6 +35,8 @@ case class MemoryMonitor(logname: String, throw new GorLowMemoryException(msg) }, minFreeMemMB, minFreeMemRatio) + override def isTypeInformationMaintained: Boolean = true + override def process(r: Row): Unit = { mmu.check(r) super.process(r) diff --git a/gortools/src/main/scala/gorsat/Monitors/MonitorLog.scala b/gortools/src/main/scala/gorsat/Monitors/MonitorLog.scala index 1e9da28e2..be8a71324 100644 --- a/gortools/src/main/scala/gorsat/Monitors/MonitorLog.scala +++ b/gortools/src/main/scala/gorsat/Monitors/MonitorLog.scala @@ -28,6 +28,9 @@ import org.gorpipe.gor.monitor.GorMonitor case class MonitorLog(logname : String, n : Int, gm : GorMonitor) extends Analysis { var m = 0L + + override def isTypeInformationMaintained: Boolean = true + override def process(r : Row): Unit = { m += 1; if ((m % n) == 0) gm.log(logname+"> ("+m+") "+r.toColString) super.process(r) diff --git a/gortools/src/main/scala/gorsat/Monitors/MonitorProgress.scala b/gortools/src/main/scala/gorsat/Monitors/MonitorProgress.scala index 266f060a0..5a7caf90e 100644 --- a/gortools/src/main/scala/gorsat/Monitors/MonitorProgress.scala +++ b/gortools/src/main/scala/gorsat/Monitors/MonitorProgress.scala @@ -31,6 +31,9 @@ case class MonitorProgress(milliSec : Int, gm : GorMonitor) extends Analysis { var t = System.currentTimeMillis var lastRowChr : String = "" var lastRowPos : Int = 0 + + override def isTypeInformationMaintained: Boolean = true + override def process(r : Row): Unit = { m += 1 if (r.chr != lastRowChr) { diff --git a/gortools/src/main/scala/gorsat/Monitors/StatsMonitor.scala b/gortools/src/main/scala/gorsat/Monitors/StatsMonitor.scala new file mode 100644 index 000000000..b9fc3df96 --- /dev/null +++ b/gortools/src/main/scala/gorsat/Monitors/StatsMonitor.scala @@ -0,0 +1,55 @@ +/* + * BEGIN_COPYRIGHT + * + * Copyright (C) 2011-2013 deCODE genetics Inc. + * Copyright (C) 2013-2019 WuXi NextCode Inc. + * All Rights Reserved. + * + * GORpipe is free software: you can redistribute it and/or modify + * it under the terms of the AFFERO GNU General Public License as published by + * the Free Software Foundation. + * + * GORpipe is distributed "AS-IS" AND WITHOUT ANY WARRANTY OF ANY KIND, + * INCLUDING ANY IMPLIED WARRANTY OF MERCHANTABILITY, + * NON-INFRINGEMENT, OR FITNESS FOR A PARTICULAR PURPOSE. See + * the AFFERO GNU General Public License for the complete license terms. + * + * You should have received a copy of the AFFERO GNU General Public License + * along with GORpipe. If not, see + * + * END_COPYRIGHT + */ + +package gorsat.Monitors + +import gorsat.Commands.{Analysis, RowHeader} +import org.gorpipe.gor.model.Row + +/** + * Collect basic stats about the analysis stream. + */ +case class StatsMonitor() extends Analysis { + var startTime: Long = System.currentTimeMillis + var stopTime = 0L + + var rowCount = 0L + var bytesCount = 0L + + override def isTypeInformationMaintained: Boolean = true + + def elapsedTime(): Long = if (stopTime > 0) stopTime - startTime else System.currentTimeMillis - startTime + + override def process(r : Row): Unit = { + bytesCount += r.length() + rowCount += 1 + + + super.process(r) + } + override def finish(): Unit = { + if (rowHeader == null) { + rowHeader = new RowHeader(Array(), Array()) + } + stopTime = System.currentTimeMillis; + } +} diff --git a/gortools/src/main/scala/gorsat/Monitors/TimeoutMonitor.scala b/gortools/src/main/scala/gorsat/Monitors/TimeoutMonitor.scala index ccd19fea2..6e620ac04 100644 --- a/gortools/src/main/scala/gorsat/Monitors/TimeoutMonitor.scala +++ b/gortools/src/main/scala/gorsat/Monitors/TimeoutMonitor.scala @@ -39,7 +39,6 @@ case class TimeoutMonitor() extends Analysis { // Check for timeout every (checkEveryXRows) rows, and also at the first call to process on the instance var resettingRowCounter = checkEveryXRows - override def isTypeInformationMaintained: Boolean = true override def process(r: Row): Unit = { diff --git a/gortools/src/test/java/gorsat/external/plink/UTestPlinkRegression.java b/gortools/src/test/java/gorsat/external/plink/UTestPlinkRegression.java index 48a03a524..fb3571598 100644 --- a/gortools/src/test/java/gorsat/external/plink/UTestPlinkRegression.java +++ b/gortools/src/test/java/gorsat/external/plink/UTestPlinkRegression.java @@ -30,6 +30,10 @@ import java.nio.file.Path; import java.nio.file.Paths; +/* Can be run in container, something like: +docker run -it --rm --name gor-test -v .:/opt/gor/src/ -w /opt/gor/src/ us-ashburn-1.ocir.io/id5mlxoq0dmt/genedx/gdb-gor-services:v13.1.3 bash +export GRADLE_USER_HOME=/tmp/.gradle + */ @Ignore("Needs plink2 installed") public class UTestPlinkRegression { String vcfheader = "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ta\tb\tc\td\te\tf\tg\ti\tj\n"; diff --git a/gortools/src/test/java/gorsat/monitors/StatsMonitorTest.java b/gortools/src/test/java/gorsat/monitors/StatsMonitorTest.java new file mode 100644 index 000000000..1eb19d9fc --- /dev/null +++ b/gortools/src/test/java/gorsat/monitors/StatsMonitorTest.java @@ -0,0 +1,107 @@ +package gorsat.monitors; + +import gorsat.Analysis.ForkWrite; +import gorsat.Analysis.OutputOptions; +import gorsat.Monitors.StatsMonitor; +import gorsat.process.PipeInstance; +import org.gorpipe.gor.binsearch.GorIndexType; +import org.gorpipe.gor.monitor.GorMonitor; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.Option; + +import java.nio.file.Path; +import java.util.zip.Deflater; + +import static gorsat.TestUtils.createPipeInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StatsMonitorTest{ + + @Rule + public TemporaryFolder workDir = new TemporaryFolder(); + private Path workDirPath; + + @Before + public void setupTest() { + workDirPath = workDir.getRoot().toPath(); + } + + @Test + public void testRowCountAndBytesCount() { + + var statsMonitor = new StatsMonitor(); + try (PipeInstance pipe = createPipeInstance(false)) { + pipe.init("gorrows -p chr1:1-1000 | calc a 'abc'", null); + pipe.lastStep().$bar(statsMonitor); + + while (pipe.hasNext()) { + pipe.next(); + } + } + + assertEquals("chrom\tpos\ta", statsMonitor.getHeader()); + assertEquals(999, statsMonitor.rowCount()); + assertEquals(11880, statsMonitor.bytesCount()); + } + + @Test + public void testElapsedTime() throws InterruptedException { + StatsMonitor monitor = new StatsMonitor(); + Thread.sleep(100); + monitor.finish(); + + long elapsedTime = monitor.elapsedTime(); + assertTrue(elapsedTime >= 100); + } + + @Test + public void testRowCountAndBytesCountForSelfWriting() { + + var statsMonitor = new StatsMonitor(); + try (PipeInstance pipe = createPipeInstance(false)) { + pipe.init("gorrows -p chr1:1-1000 | calc a 'abc' | write " + workDirPath.resolve("test.gor").toString(), new GorMonitor()); + pipe.lastStep().$bar(statsMonitor); + + while (pipe.hasNext()) { + pipe.next(); + } + } + + assertEquals("", statsMonitor.getHeader()); + assertEquals(0, statsMonitor.rowCount()); + assertEquals(0, statsMonitor.bytesCount()); + } + + @Test + public void testRowCountAndBytesCountForAddedWrite() { + + var statsMonitor = new StatsMonitor(); + ForkWrite forkWrite = null; + try (PipeInstance pipe = createPipeInstance(true)) { + pipe.init("gorrows -p chr1:1-1000 | calc a 'abc'", new GorMonitor()); + pipe.lastStep().$bar(statsMonitor); + + var outputOptions = new OutputOptions(false, false, true, false, + false, GorIndexType.NONE, new String[0], new String[0], Option.empty(), Option.empty(), Deflater.BEST_SPEED, + Option.empty(), false, false, null, "", null, false, false); + forkWrite = new ForkWrite(-1, workDirPath.resolve("test.gor").toString(), pipe.getSession(), pipe.getHeader(), outputOptions); + + pipe.lastStep().$bar(forkWrite); + + while (pipe.hasNext()) { + pipe.next(); + } + } + + assertEquals("chrom\tpos\ta", statsMonitor.getHeader()); + assertEquals(999, statsMonitor.rowCount()); + assertEquals(11880, statsMonitor.bytesCount()); + + assertEquals("c84f18441d00c7ea79d233efdaf8f07b", forkWrite.getMd5()); + } + +} diff --git a/model/src/main/java/org/gorpipe/gor/driver/GorDriverConfig.java b/model/src/main/java/org/gorpipe/gor/driver/GorDriverConfig.java index e9af73a66..345723ffe 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/GorDriverConfig.java +++ b/model/src/main/java/org/gorpipe/gor/driver/GorDriverConfig.java @@ -133,7 +133,7 @@ public interface GorDriverConfig extends Config { @Documentation("The maximum range to which to automatically extend the source reads.") @Key("org.gorpipe.gor.driver.extended_range_streaming.max_request_size") - @DefaultValue("100 mb") + @DefaultValue("1 mb") @ConverterClass(ByteSizeConverter.class) ByteSize extendedRangeStreamingMaxRequestSize(); diff --git a/model/src/main/java/org/gorpipe/gor/driver/pgen/VariableWidthPGenOutputStream.java b/model/src/main/java/org/gorpipe/gor/driver/pgen/VariableWidthPGenOutputStream.java index 9873e3280..4d6e56f4b 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/pgen/VariableWidthPGenOutputStream.java +++ b/model/src/main/java/org/gorpipe/gor/driver/pgen/VariableWidthPGenOutputStream.java @@ -106,6 +106,7 @@ public void close() throws IOException { this.os = fileReaderOptional.isPresent() ? fileReaderOptional.get().getOutputStream(this.fileName) : new FileOutputStream(this.fileName); writeHeader(); mergeVariantBlocks(); + closeCurrentStream(); } } diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/wrappers/ExtendedRangeWrapper.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/wrappers/ExtendedRangeWrapper.java index 238aecbc1..767af7dbd 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/wrappers/ExtendedRangeWrapper.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/wrappers/ExtendedRangeWrapper.java @@ -22,6 +22,7 @@ package org.gorpipe.gor.driver.providers.stream.sources.wrappers; +import org.gorpipe.base.config.converters.ByteSizeConverter; import org.gorpipe.exceptions.GorResourceException; import org.gorpipe.gor.driver.adapters.PersistentInputStream; import org.gorpipe.gor.driver.providers.stream.RequestRange; @@ -62,9 +63,9 @@ public class ExtendedRangeWrapper extends WrappedStreamSource { private static final Logger log = LoggerFactory.getLogger(ExtendedRangeWrapper.class); - public static final int DEFAULT_SEEK_THRESHOLD = 32 * 1024; // 32kB - public static final int DEFAULT_MAX_RANGE = 100 * 1024 * 1024; - public static final int DEFAULT_HEADER_RANGE = 1024 * 1024; // 1MB + public static final int DEFAULT_SEEK_THRESHOLD = ByteSizeConverter.parse(System.getProperty("org.gorpipe.gor.driver.extended_range_streaming.seek_threshold", "64 kb")).getBytesAsInt(); + public static final int DEFAULT_MIN_RANGE = ByteSizeConverter.parse(System.getProperty("org.gorpipe.gor.driver.extended_range_streaming.min_request_size", "64 kb")).getBytesAsInt(); + public static final int DEFAULT_MAX_RANGE = ByteSizeConverter.parse(System.getProperty("org.gorpipe.gor.driver.extended_range_streaming.max_request_size", "1 mb")).getBytesAsInt(); private final int seekThreshold; private final int maxRange; @@ -85,12 +86,12 @@ public ExtendedRangeWrapper(StreamSource source, int seekThreshold, int maxRange @Override public InputStream openClosable() { - return open(0, DEFAULT_HEADER_RANGE); + return open(0, DEFAULT_MIN_RANGE); } @Override public InputStream open() { - return open(0, DEFAULT_HEADER_RANGE); + return open(0, DEFAULT_MIN_RANGE); } @Override @@ -180,7 +181,7 @@ public int read(byte[] b, int off, int len) { // 2. Calculate request length - double of last request up to the maximum. But no smaller than the remaining read. //long rlen = Math.max(len - read, Math.min(lastRequest.getLength() * 2, maxRange)); - long rlen = Math.max(len - read, Math.min(Math.max(lastRequest.getLength() * 2, DEFAULT_HEADER_RANGE), maxRange)); + long rlen = Math.max(len - read, Math.min(Math.max(lastRequest.getLength() * 2, DEFAULT_MIN_RANGE), maxRange)); // 3. Open new 'in' stream at last position + new request length diff --git a/model/src/main/scala/gorsat/Commands/Analysis.scala b/model/src/main/scala/gorsat/Commands/Analysis.scala index a16ed635e..f6a026b02 100644 --- a/model/src/main/scala/gorsat/Commands/Analysis.scala +++ b/model/src/main/scala/gorsat/Commands/Analysis.scala @@ -33,7 +33,7 @@ abstract class Analysis() extends Processor with Cloneable { var nextProcessor: Processor = _ var alreadyFinished = false var isInErrorState = false - var rowHeader: RowHeader = _ + var rowHeader: RowHeader = _ // Input row header. var cloned : Analysis = _ var isCloned = false @@ -124,6 +124,9 @@ abstract class Analysis() extends Processor with Cloneable { pipeTo = to nextProcessor = to to.from(this) + if (rowHeader != null) { + setRowHeader(rowHeader) + } } this } @@ -140,8 +143,9 @@ abstract class Analysis() extends Processor with Cloneable { null } + // Input header string def getHeader(): String = { - null + if (rowHeader != null) rowHeader.toString else null } def setup() : Unit = {} diff --git a/test/src/main/java/gorsat/TestUtils.java b/test/src/main/java/gorsat/TestUtils.java index a7c23387e..272dee349 100644 --- a/test/src/main/java/gorsat/TestUtils.java +++ b/test/src/main/java/gorsat/TestUtils.java @@ -292,7 +292,7 @@ private static PipeInstance createPipeInstance(boolean server, String securityCo return PipeInstance.createGorIterator(new GorContext(createSession(server, securityContext, writeLocations))); } - private static PipeInstance createPipeInstance(boolean server, String gorroot, String cacheDir, String securityContext, String[] writeLocations) { + public static PipeInstance createPipeInstance(boolean server, String gorroot, String cacheDir, String securityContext, String[] writeLocations) { List options = new ArrayList<>(); if (gorroot != null) { options.add("-gorroot"); @@ -318,7 +318,7 @@ private static String runGorPipeWithOptions(String query, boolean includeHeader, } } - private static PipeInstance createPipeInstance(boolean server) { + public static PipeInstance createPipeInstance(boolean server) { return PipeInstance.createGorIterator(createSession(server).getGorContext()); } diff --git a/versions.properties b/versions.properties index e903511ae..819877f16 100644 --- a/versions.properties +++ b/versions.properties @@ -74,6 +74,7 @@ version.software.amazon.awssdk..aws-crt-client=2.29.47 ## # available=2.30.5 ## # available=2.30.6 ## # available=2.30.7 +## # available=2.30.8 version.software.amazon.awssdk..netty-nio-client=2.29.47 ## # available=2.29.48 @@ -88,6 +89,8 @@ version.software.amazon.awssdk..netty-nio-client=2.29.47 ## # available=2.30.4 ## # available=2.30.5 ## # available=2.30.6 +## # available=2.30.7 +## # available=2.30.8 version.software.amazon.awssdk..s3=2.29.47 ## # available=2.29.48 @@ -102,6 +105,8 @@ version.software.amazon.awssdk..s3=2.29.47 ## # available=2.30.4 ## # available=2.30.5 ## # available=2.30.6 +## # available=2.30.7 +## # available=2.30.8 #version.software.amazon.nio.s3..aws-java-nio-spi-for-s3=2.0.5 @@ -112,6 +117,7 @@ version.org.carlspring.cloud.aws..s3fs-nio=1.0.6 ## # available=3.0.0 version.com.auth0..java-jwt=4.4.0 +## # available=4.5.0 version.com.fasterxml.jackson.core..jackson-databind=2.18.2 @@ -128,6 +134,7 @@ version.com.google.auto.service..auto-service=1.1.1 version.com.google.cloud..google-cloud-storage=2.46.0 ## # available=2.47.0 +## # available=2.48.0 version.com.google.guava..guava=33.4.0-jre @@ -285,6 +292,7 @@ version.jakarta.json..jakarta.json-api=2.1.3 version.software.amazon.awssdk.crt..aws-crt=0.33.7 ## # available=0.33.9 +## # available=0.33.10 ## unused version.software.amazon.nio.s3..aws-java-nio-spi-for-s3=2.2.0