8000 introduce modes for required ordered serialization by fwbrasil · Pull Request #1757 · twitter/scalding · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

introduce modes for required ordered serialization #1757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions scalding-core/src/main/scala/com/twitter/scalding/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import java.net.URI

import scala.collection.JavaConverters._
import scala.util.{ Failure, Success, Try }
import com.twitter.scalding.serialization.RequireOrderedSerializationMode

/**
* This is a wrapper class on top of Map[String, String]
Expand Down Expand Up @@ -138,17 +139,30 @@ trait Config extends Serializable {
def setMapSideAggregationThreshold(count: Int): Config =
this + (AggregateBy.AGGREGATE_BY_THRESHOLD -> count.toString)

@deprecated("Use setRequireOrderedSerializationMode", "12/14/17")
def setRequireOrderedSerialization(b: Boolean): Config =
this + (ScaldingRequireOrderedSerialization -> (b.toString))

@deprecated("Use getRequireOrderedSerializationMode", "12/14/17")
def getRequireOrderedSerialization: Boolean =
getRequireOrderedSerializationMode == Some(RequireOrderedSerializationMode.Fail)

/**
* Set this configuration option to require all grouping/cogrouping
* to use OrderedSerialization
*/
def setRequireOrderedSerialization(b: Boolean): Config =
this + (ScaldingRequireOrderedSerialization -> (b.toString))
def setRequireOrderedSerializationMode(r: Option[RequireOrderedSerializationMode]): Config =
r.map {
v => this + (ScaldingRequireOrderedSerialization -> (v.toString))
}.getOrElse(this)

def getRequireOrderedSerialization: Boolean =
def getRequireOrderedSerializationMode: Option[RequireOrderedSerializationMode] =
get(ScaldingRequireOrderedSerialization)
.map(_.toBoolean)
.getOrElse(false)
.map(_.toLowerCase()).collect {
case "true" => RequireOrderedSerializationMode.Fail // backwards compatibility
case "fail" => RequireOrderedSerializationMode.Fail
case "log" => RequireOrderedSerializationMode.Log
}

def getCascadingSerializationTokens: Map[Int, String] =
get(Config.CascadingSerializationTokens)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ trait ExecutionContext {
// identify the flowDef
val configWithId = config.addUniqueId(UniqueID.getIDFor(flowDef))
val flow = mode.newFlowConnector(configWithId).connect(flowDef)
if (config.getRequireOrderedSerialization) {

config.getRequireOrderedSerializationMode.map { mode =>
// This will throw, but be caught by the outer try if
// we have groupby/cogroupby not using OrderedSerializations
CascadingBinaryComparator.checkForOrderedSerialization(flow).get
CascadingBinaryComparator.checkForOrderedSerialization(flow, mode).get
}

flow match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.twitter.scalding.ExecutionContext.getDesc
import java.io.InputStream
import java.util.Comparator
import scala.util.{ Failure, Success, Try }
import org.slf4j.LoggerFactory

/**
* This is the type that should be fed to cascading to enable binary comparators
Expand All @@ -40,11 +41,13 @@ class CascadingBinaryComparator[T](ob: OrderedSerialization[T]) extends Comparat

object CascadingBinaryComparator {

private val LOG = LoggerFactory.getLogger(this.getClass)

/**
* This method will walk the flowDef and make sure all the
* groupBy/cogroups are using a CascadingBinaryComparator
*/
private[scalding] def checkForOrderedSerialization[T](flow: Flow[T]): Try[Unit] = {
private[scalding] def checkForOrderedSerialization[T](flow: Flow[T], mode: RequireOrderedSerializationMode): Try[Unit] = {
import collection.JavaConverters._
import cascading.pipe._
import com.twitter.scalding.RichPipe
Expand All @@ -53,8 +56,17 @@ object CascadingBinaryComparator {
def reduce(it: TraversableOnce[Try[Unit]]): Try[Unit] =
it.find(_.isFailure).getOrElse(Success(()))

def failure(s: String): Try[Unit] =
Failure(new RuntimeException("Cannot verify OrderedSerialization: " + s))
def failure(s: String): Try[Unit] = {
val message =
s"Cannot verify OrderedSerialization: $s. Add `import com.twitter.scalding.serialization.RequiredBinaryComparators._`"
mode match {
case RequireOrderedSerializationMode.Fail =>
Failure(new RuntimeException(message))
case RequireOrderedSerializationMode.Log =>
LOG.warn(message)
Try(())
}
}

def check(s: Splice): Try[Unit] = {
val m = s.getKeySelectors.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ object RequiredBinaryComparators {
*/
trait RequiredBinaryComparatorsExecutionApp extends ExecutionApp {
implicit def ordSer[T]: OrderedSerialization[T] = macro com.twitter.scalding.serialization.macros.impl.OrderedSerializationProviderImpl[T]

def requireOrderedSerializationMode: RequireOrderedSerializationMode = RequireOrderedSerializationMode.Fail
override def config(inputArgs: Array[String]): (Config, Mode) = {
val (conf, m) = super.config(inputArgs)
(conf.setRequireOrderedSerialization(true), m)
(conf.setRequireOrderedSerializationMode(Some(requireOrderedSerializationMode)), m)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package com.twitter.scalding.serialization

import com.twitter.scalding.{ Config, Job }

sealed trait RequireOrderedSerializationMode
object RequireOrderedSerializationMode {
case object Fail extends RequireOrderedSerializationMode
case object Log extends RequireOrderedSerializationMode
}

trait RequiredBinaryComparatorsConfig extends Job {
override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> "true")
def requireOrderedSerializationMode: RequireOrderedSerializationMode = RequireOrderedSerializationMode.Fail
override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> requireOrderedSerializationMode.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the .toString supposed to output here? Aren't we matching on lowercase in getRequireOrderedSerializationMode above?

}
1E0A
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@ package com.twitter.scalding
import com.twitter.scalding.serialization.CascadingBinaryComparator
import com.twitter.scalding.serialization.OrderedSerialization
import com.twitter.scalding.serialization.StringOrderedSerialization
import com.twitter.scalding.serialization.RequireOrderedSerializationMode

import org.scalatest.{ Matchers, WordSpec }

class NoOrderdSerJob(args: Args) extends Job(args) {
class NoOrderdSerJob(args: Args, requireOrderedSerializationMode: String) extends Job(args) {

override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> "true")
override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> requireOrderedSerializationMode)

TypedPipe.from(TypedTsv[(String, String)]("input"))
.group
.max
.write(TypedTsv[(String, String)]("output"))
}

class OrderdSerJob(args: Args) extends Job(args) {
class OrderdSerJob(args: Args, requireOrderedSerializationMode: String) extends Job(args) {

implicit def stringOS: OrderedSerialization[String] = new StringOrderedSerialization

override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> "true")
override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> requireOrderedSerializationMode)

TypedPipe.from(TypedTsv[(String, String)]("input"))
.group
Expand All @@ -45,29 +46,64 @@ class OrderdSerJob(args: Args) extends Job(args) {
}

class RequireOrderedSerializationTest extends WordSpec with Matchers {

"A NoOrderedSerJob" should {
// throw if we try to run in:
"throw when run" in {

def test(job: Args => Job) =
JobTest(job)
.source(TypedTsv[(String, String)]("input"), List(("a", "a"), ("b", "b")))
.sink[(String, String)](TypedTsv[(String, String)]("output")) { outBuf => () }
.run
.finish()

"throw when mode is Fail" in {
val ex = the[Exception] thrownBy {
test(new NoOrderdSerJob(_, RequireOrderedSerializationMode.Fail.toString))
}
ex.getMessage should include("SerializationTest.scala:")
}

"not throw when mode is Log" in {
test(new NoOrderdSerJob(_, RequireOrderedSerializationMode.Log.toString))
}

"throw when mode is true" in {
val ex = the[Exception] thrownBy {
JobTest(new NoOrderdSerJob(_))
.source(TypedTsv[(String, String)]("input"), List(("a", "a"), ("b", "b")))
.sink[(String, String)](TypedTsv[(String, String)]("output")) { outBuf => () }
.run
.finish()
test(new NoOrderdSerJob(_, "true"))
}
ex.getMessage should include("SerializationTest.scala:")
}

"not throw when mode is false" in {
test(new NoOrderdSerJob(_, "false"))
}
}

"A OrderedSerJob" should {
// throw if we try to run in:
"run" in {
JobTest(new OrderdSerJob(_))

def test(job: Args => Job) =
JobTest(job)
.source(TypedTsv[(String, String)]("input"), List(("a", "a"), ("a", "b"), ("b", "b")))
.sink[(String, String)](TypedTsv[(String, String)]("output")) { outBuf =>
outBuf.toSet shouldBe Set(("a", "b"), ("b", "b"))
}
.run
.finish()

"run when mode is Fail" in {
test(new OrderdSerJob(_, RequireOrderedSerializationMode.Fail.toString))
}

"run when mode is Log" in {
test(new OrderdSerJob(_, RequireOrderedSerializationMode.Log.toString))
}

"run when mode is true" in {
test(new OrderdSerJob(_, "true"))
}

"run when mode is false" in {
test(new OrderdSerJob(_, "false"))
}
}
}
0