8000 Separate planning from optimization by johnynek · Pull Request #1731 · twitter/scalding · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Separate planning from optimization #1731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Oct 9, 2017
Merged

Separate planning from optimization #1731

merged 19 commits into from
Oct 9, 2017

Conversation

johnynek
Copy link
Collaborator
@johnynek johnynek commented Oct 5, 2017

This PR totally separates the planning portion from optimization.

When we are converting to a cascading Pipe, we just literally translate. Before we do that we apply a number of phases of optimizations.

We want to have more optimizations in the future, but right now the priority is getting tests to pass. Some of our tests were actually relying on existing optimizations, and certainly performance will regress if we don't do the optimizations we used to do.

In a future PR, I want to make the optimizations more configurable, which will also make testing easier if we can run a TypedPipe without doing any optimizations, this will allow us to make sure the optimizations never change logic of the job.

cc @ianoc @non

* a.map(f).flatMap(g) == a.flatMap { x => g(f(x)) }
* a.flatMap(f).map(g) == a.flatMap { x => f(x).map(g) }
*
* This is a rule you may want to apply after having
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Dagon rule optimization does it support this notion? I.e. do you run each rule to stability or do you run all the rules in a loop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rules don’t, but you can apply a sequence of rules to do this (although, I forgot to use that API in this PR. Using it will clean it up a bit)

object ComposeMapFlatMap extends PartialRule[TypedPipe] {
def applyWhere[T](on: Dag[TypedPipe]) = {
case FlatMapped(Mapped(in, f), g) =>
FlatMapped(in, FlatMappedFn(g).runAfter(FlatMapping.Map(f)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc how type safe are these rule writing's? if you swapped f and g here would this still compile?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s type-safe. It would not compile if you swapped the functions since the result type wouldn’t be a TypedPipe[T]

That’s the exciting part of Dagon!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 !

}

/**
* In scalding 0.17 and earlier, descriptions were automatically pushdown below
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one, i think we need to do this to enable function coalescing in Dagon/scalding right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. That’s why I added it. Actually, traps seem to require function coalescing due to the strange way they work. Traps are not totally safe currently it seems and can still throw runtime exceptions if the user confuses cascading.

I think we can make them safe with optimization rules, but they are still confusing since the trap only reaches down to the next write barrier which is not always obvious in the graph. I’m not really a fan of traps by the way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha i remember you not being a huge fan of them when they were added. I believe we could implement them in scalding land instead at some point -- and actually just make them work on spark too in the same way ish? some sort of withSideEffectingTry() which is a mix of a flatmap and a try + catch. Dump into a file the bad records. But i looked through the comments related to traps in this pr and it doesn't seem like we have particularly great semantics now and its likely buggy. So thats all just sad. But out of scope for this change to make traps great again.

// TODO, this may be identity if the setter is the inverse of the
// converter. If we can identify this we will save allocations
val resFd = new RichFlowDef(fd)
resFd.mergeFrom(localFlowDef)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge from doesn't mutate localFlowDef i imagine?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right. We have been using that for a while. It mutates the left, but not the right.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

rec(FlatMapped(input, FlatMappedFn.fromMap(fn)))

case (m@MergedTypedPipe(_, _), rec) =>
OptimizationRules.unrollMerge(m) match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems odd here, maybe a comment as to why its not just in the optimizer flow and we need it here too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason is Merge has arity 2, but cascading merge can take a list. I don't think cascading is happy if you pass it a merge with no items, and I didn't want to do one with a single merge, but that maybe is a call.

So, we really have to unroll or we will make a giant linked list of Merge nodes, so, this is a case where some transformation is needed.

Or did you just mean the special casing for 0 and 1 item? Again, I think that is so cascading won't blow up.

The other optimizations should definitely be in a rule (x ++ x == x.flatMap { y => List(y, y) } for instance).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it might be leaky but i'd have thought that maybe in an ideal we would be able to have the invariant for this operator that if we see a MergedTypedPipe post optimization it should always just be 2 pipes. But it does mean we would need to introduce a new node type of a 'many pipe' merge. Which is maybe not worth it then?

@ianoc
Copy link
Collaborator
ianoc commented Oct 5, 2017

Obviously a huge change, so its kind of hard to be sure its all right. But scalding has pretty extensive tests so between that and the read I did i think this is a 👍

Good to get more convergence into the one flow. The separation continues to look better and better. Nice work!

@johnynek
Copy link
Collaborator Author
johnynek commented Oct 7, 2017

@ianoc I had to make a couple of minor changes to make the tests pass.

  1. was a line number check was testing not only for line numbers, but it was testing it was applied to the very last pipe, which is a very strong contract to enforce.

  2. I went ahead and moved the force-before-hashjoin rule into the optimizer, since without it, the tests didn't pass (since the old rules were a bit brittle and tied to how we planned to cascading (they also get clearer if you ask me)

  3. there was a bug in filter composition, rather than f(x) && g(x) I was doing f(x) && f(x) which typechecks, but is of course wrong.

Can I merge? I am eager to keep working on the board:
https://github.com/twitter/scalding/projects/1

@johnynek
Copy link
Collaborator Author
johnynek commented Oct 9, 2017

I'm going to take the 👍 given here: #1731 (comment) as a shipit.

I have another PR in the pipe that I want to stage. Any concerns I'll fix in a subsequent PR.

@johnynek johnynek merged commit bdd5dcc into develop Oct 9, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0