From 5109b952f4c189f68c25007f2d0b3b7535955bf6 Mon Sep 17 00:00:00 2001 From: fwbrasil Date: Thu, 14 Dec 2017 12:03:45 -0800 Subject: [PATCH] introduce modes for required ordered serialization --- .../scala/com/twitter/scalding/Config.scala | 24 +++++-- .../twitter/scalding/ExecutionContext.scala | 5 +- .../CascadingBinaryComparator.scala | 18 +++++- .../RequiredBinaryComparators.scala | 4 +- .../RequiredBinaryComparatorsConfig.scala | 9 ++- .../RequireOrderedSerializationTest.scala | 64 +++++++++++++++---- 6 files changed, 97 insertions(+), 27 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index af5a6c9ea3..74fbdee837 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -34,6 +34,7 @@ import java.net.URI import scala.collection.JavaConverters._ import scala.util.{ Failure, Success, Try } +import com.twitter.scalding.serialization.RequireOrderedSerializationMode /** * This is a wrapper class on top of Map[String, String] @@ -138,17 +139,30 @@ trait Config extends Serializable { def setMapSideAggregationThreshold(count: Int): Config = this + (AggregateBy.AGGREGATE_BY_THRESHOLD -> count.toString) + @deprecated("Use setRequireOrderedSerializationMode", "12/14/17") + def setRequireOrderedSerialization(b: Boolean): Config = + this + (ScaldingRequireOrderedSerialization -> (b.toString)) + + @deprecated("Use getRequireOrderedSerializationMode", "12/14/17") + def getRequireOrderedSerialization: Boolean = + getRequireOrderedSerializationMode == Some(RequireOrderedSerializationMode.Fail) + /** * Set this configuration option to require all grouping/cogrouping * to use OrderedSerialization */ - def setRequireOrderedSerialization(b: Boolean): Config = - this + (ScaldingRequireOrderedSerialization -> (b.toString)) + def setRequireOrderedSerializationMode(r: Option[RequireOrderedSerializationMode]): Config = + r.map { + v => this + (ScaldingRequireOrderedSerialization -> (v.toString)) + }.getOrElse(this) - def getRequireOrderedSerialization: Boolean = + def getRequireOrderedSerializationMode: Option[RequireOrderedSerializationMode] = get(ScaldingRequireOrderedSerialization) - .map(_.toBoolean) - .getOrElse(false) + .map(_.toLowerCase()).collect { + case "true" => RequireOrderedSerializationMode.Fail // backwards compatibility + case "fail" => RequireOrderedSerializationMode.Fail + case "log" => RequireOrderedSerializationMode.Log + } def getCascadingSerializationTokens: Map[Int, String] = get(Config.CascadingSerializationTokens) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala index f551b80b86..27a57743a4 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala @@ -73,10 +73,11 @@ trait ExecutionContext { // identify the flowDef val configWithId = config.addUniqueId(UniqueID.getIDFor(flowDef)) val flow = mode.newFlowConnector(configWithId).connect(flowDef) - if (config.getRequireOrderedSerialization) { + + config.getRequireOrderedSerializationMode.map { mode => // This will throw, but be caught by the outer try if // we have groupby/cogroupby not using OrderedSerializations - CascadingBinaryComparator.checkForOrderedSerialization(flow).get + CascadingBinaryComparator.checkForOrderedSerialization(flow, mode).get } flow match { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala index 75c9cfae41..8fb6e6d172 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala @@ -23,6 +23,7 @@ import com.twitter.scalding.ExecutionContext.getDesc import java.io.InputStream import java.util.Comparator import scala.util.{ Failure, Success, Try } +import org.slf4j.LoggerFactory /** * This is the type that should be fed to cascading to enable binary comparators @@ -40,11 +41,13 @@ class CascadingBinaryComparator[T](ob: OrderedSerialization[T]) extends Comparat object CascadingBinaryComparator { + private val LOG = LoggerFactory.getLogger(this.getClass) + /** * This method will walk the flowDef and make sure all the * groupBy/cogroups are using a CascadingBinaryComparator */ - private[scalding] def checkForOrderedSerialization[T](flow: Flow[T]): Try[Unit] = { + private[scalding] def checkForOrderedSerialization[T](flow: Flow[T], mode: RequireOrderedSerializationMode): Try[Unit] = { import collection.JavaConverters._ import cascading.pipe._ import com.twitter.scalding.RichPipe @@ -53,8 +56,17 @@ object CascadingBinaryComparator { def reduce(it: TraversableOnce[Try[Unit]]): Try[Unit] = it.find(_.isFailure).getOrElse(Success(())) - def failure(s: String): Try[Unit] = - Failure(new RuntimeException("Cannot verify OrderedSerialization: " + s)) + def failure(s: String): Try[Unit] = { + val message = + s"Cannot verify OrderedSerialization: $s. Add `import com.twitter.scalding.serialization.RequiredBinaryComparators._`" + mode match { + case RequireOrderedSerializationMode.Fail => + Failure(new RuntimeException(message)) + case RequireOrderedSerializationMode.Log => + LOG.warn(message) + Try(()) + } + } def check(s: Splice): Try[Unit] = { val m = s.getKeySelectors.asScala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/serialization/RequiredBinaryComparators.scala b/scalding-core/src/main/scala/com/twitter/scalding/serialization/RequiredBinaryComparators.scala index f757c84ec6..b188c982f8 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/serialization/RequiredBinaryComparators.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/serialization/RequiredBinaryComparators.scala @@ -26,9 +26,9 @@ object RequiredBinaryComparators { */ trait RequiredBinaryComparatorsExecutionApp extends ExecutionApp { implicit def ordSer[T]: OrderedSerialization[T] = macro com.twitter.scalding.serialization.macros.impl.OrderedSerializationProviderImpl[T] - + def requireOrderedSerializationMode: RequireOrderedSerializationMode = RequireOrderedSerializationMode.Fail override def config(inputArgs: Array[String]): (Config, Mode) = { val (conf, m) = super.config(inputArgs) - (conf.setRequireOrderedSerialization(true), m) + (conf.setRequireOrderedSerializationMode(Some(requireOrderedSerializationMode)), m) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/serialization/RequiredBinaryComparatorsConfig.scala b/scalding-core/src/main/scala/com/twitter/scalding/serialization/RequiredBinaryComparatorsConfig.scala index 46c30a203d..d14872d6cc 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/serialization/RequiredBinaryComparatorsConfig.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/serialization/RequiredBinaryComparatorsConfig.scala @@ -2,6 +2,13 @@ package com.twitter.scalding.serialization import com.twitter.scalding.{ Config, Job } +sealed trait RequireOrderedSerializationMode +object RequireOrderedSerializationMode { + case object Fail extends RequireOrderedSerializationMode + case object Log extends RequireOrderedSerializationMode +} + trait RequiredBinaryComparatorsConfig extends Job { - override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> "true") + def requireOrderedSerializationMode: RequireOrderedSerializationMode = RequireOrderedSerializationMode.Fail + override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> requireOrderedSerializationMode.toString) } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/RequireOrderedSerializationTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/RequireOrderedSerializationTest.scala index 0baca83837..a54439a482 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/typed/RequireOrderedSerializationTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/RequireOrderedSerializationTest.scala @@ -18,12 +18,13 @@ package com.twitter.scalding import com.twitter.scalding.serialization.CascadingBinaryComparator import com.twitter.scalding.serialization.OrderedSerialization import com.twitter.scalding.serialization.StringOrderedSerialization +import com.twitter.scalding.serialization.RequireOrderedSerializationMode import org.scalatest.{ Matchers, WordSpec } -class NoOrderdSerJob(args: Args) extends Job(args) { +class NoOrderdSerJob(args: Args, requireOrderedSerializationMode: String) extends Job(args) { - override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> "true") + override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> requireOrderedSerializationMode) TypedPipe.from(TypedTsv[(String, String)]("input")) .group @@ -31,11 +32,11 @@ class NoOrderdSerJob(args: Args) extends Job(args) { .write(TypedTsv[(String, String)]("output")) } -class OrderdSerJob(args: Args) extends Job(args) { +class OrderdSerJob(args: Args, requireOrderedSerializationMode: String) extends Job(args) { implicit def stringOS: OrderedSerialization[String] = new StringOrderedSerialization - override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> "true") + override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> requireOrderedSerializationMode) TypedPipe.from(TypedTsv[(String, String)]("input")) .group @@ -45,29 +46,64 @@ class OrderdSerJob(args: Args) extends Job(args) { } class RequireOrderedSerializationTest extends WordSpec with Matchers { + "A NoOrderedSerJob" should { - // throw if we try to run in: - "throw when run" in { + + def test(job: Args => Job) = + JobTest(job) + .source(TypedTsv[(String, String)]("input"), List(("a", "a"), ("b", "b"))) + .sink[(String, String)](TypedTsv[(String, String)]("output")) { outBuf => () } + .run + .finish() + + "throw when mode is Fail" in { + val ex = the[Exception] thrownBy { + test(new NoOrderdSerJob(_, RequireOrderedSerializationMode.Fail.toString)) + } + ex.getMessage should include("SerializationTest.scala:") + } + + "not throw when mode is Log" in { + test(new NoOrderdSerJob(_, RequireOrderedSerializationMode.Log.toString)) + } + + "throw when mode is true" in { val ex = the[Exception] thrownBy { - JobTest(new NoOrderdSerJob(_)) - .source(TypedTsv[(String, String)]("input"), List(("a", "a"), ("b", "b"))) - .sink[(String, String)](TypedTsv[(String, String)]("output")) { outBuf => () } - .run - .finish() + test(new NoOrderdSerJob(_, "true")) } ex.getMessage should include("SerializationTest.scala:") } + + "not throw when mode is false" in { + test(new NoOrderdSerJob(_, "false")) + } } + "A OrderedSerJob" should { - // throw if we try to run in: - "run" in { - JobTest(new OrderdSerJob(_)) + + def test(job: Args => Job) = + JobTest(job) .source(TypedTsv[(String, String)]("input"), List(("a", "a"), ("a", "b"), ("b", "b"))) .sink[(String, String)](TypedTsv[(String, String)]("output")) { outBuf => outBuf.toSet shouldBe Set(("a", "b"), ("b", "b")) } .run .finish() + + "run when mode is Fail" in { + test(new OrderdSerJob(_, RequireOrderedSerializationMode.Fail.toString)) + } + + "run when mode is Log" in { + test(new OrderdSerJob(_, RequireOrderedSerializationMode.Log.toString)) + } + + "run when mode is true" in { + test(new OrderdSerJob(_, "true")) + } + + "run when mode is false" in { + test(new OrderdSerJob(_, "false")) } } }