-
Notifications
You must be signed in to change notification settings - Fork 708
Separate planning from optimization #1731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* a.map(f).flatMap(g) == a.flatMap { x => g(f(x)) } | ||
* a.flatMap(f).map(g) == a.flatMap { x => f(x).map(g) } | ||
* | ||
* This is a rule you may want to apply after having |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Dagon rule optimization does it support this notion? I.e. do you run each rule to stability or do you run all the rules in a loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rules don’t, but you can apply a sequence of rules to do this (although, I forgot to use that API in this PR. Using it will clean it up a bit)
object ComposeMapFlatMap extends PartialRule[TypedPipe] { | ||
def applyWhere[T](on: Dag[TypedPipe]) = { | ||
case FlatMapped(Mapped(in, f), g) => | ||
FlatMapped(in, FlatMappedFn(g).runAfter(FlatMapping.Map(f))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc how type safe are these rule writing's? if you swapped f and g here would this still compile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s type-safe. It would not compile if you swapped the functions since the result type wouldn’t be a TypedPipe[T]
That’s the exciting part of Dagon!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 !
} | ||
|
||
/** | ||
* In scalding 0.17 and earlier, descriptions were automatically pushdown below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this one, i think we need to do this to enable function coalescing in Dagon/scalding right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. That’s why I added it. Actually, traps seem to require function coalescing due to the strange way they work. Traps are not totally safe currently it seems and can still throw runtime exceptions if the user confuses cascading.
I think we can make them safe with optimization rules, but they are still confusing since the trap only reaches down to the next write barrier which is not always obvious in the graph. I’m not really a fan of traps by the way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha i remember you not being a huge fan of them when they were added. I believe we could implement them in scalding land instead at some point -- and actually just make them work on spark too in the same way ish? some sort of withSideEffectingTry() which is a mix of a flatmap and a try + catch. Dump into a file the bad records. But i looked through the comments related to traps in this pr and it doesn't seem like we have particularly great semantics now and its likely buggy. So thats all just sad. But out of scope for this change to make traps great again.
// TODO, this may be identity if the setter is the inverse of the | ||
// converter. If we can identify this we will save allocations | ||
val resFd = new RichFlowDef(fd) | ||
resFd.mergeFrom(localFlowDef) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge from doesn't mutate localFlowDef
i imagine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's right. We have been using that for a while. It mutates the left, but not the right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
rec(FlatMapped(input, FlatMappedFn.fromMap(fn))) | ||
|
||
case (m@MergedTypedPipe(_, _), rec) => | ||
OptimizationRules.unrollMerge(m) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems odd here, maybe a comment as to why its not just in the optimizer flow and we need it here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reason is Merge has arity 2, but cascading merge can take a list. I don't think cascading is happy if you pass it a merge with no items, and I didn't want to do one with a single merge, but that maybe is a call.
So, we really have to unroll or we will make a giant linked list of Merge nodes, so, this is a case where some transformation is needed.
Or did you just mean the special casing for 0 and 1 item? Again, I think that is so cascading won't blow up.
The other optimizations should definitely be in a rule (x ++ x == x.flatMap { y => List(y, y) }
for instance).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it might be leaky but i'd have thought that maybe in an ideal we would be able to have the invariant for this operator that if we see a MergedTypedPipe post optimization it should always just be 2 pipes. But it does mean we would need to introduce a new node type of a 'many pipe' merge. Which is maybe not worth it then?
Obviously a huge change, so its kind of hard to be sure its all right. But scalding has pretty extensive tests so between that and the read I did i think this is a 👍 Good to get more convergence into the one flow. The separation continues to look better and better. Nice work! |
@ianoc I had to make a couple of minor changes to make the tests pass.
Can I merge? I am eager to keep working on the board: |
I'm going to take the 👍 given here: #1731 (comment) as a shipit. I have another PR in the pipe that I want to stage. Any concerns I'll fix in a subsequent PR. |
This PR totally separates the planning portion from optimization.
When we are converting to a cascading Pipe, we just literally translate. Before we do that we apply a number of phases of optimizations.
We want to have more optimizations in the future, but right now the priority is getting tests to pass. Some of our tests were actually relying on existing optimizations, and certainly performance will regress if we don't do the optimizations we used to do.
In a future PR, I want to make the optimizations more configurable, which will also make testing easier if we can run a TypedPipe without doing any optimizations, this will allow us to make sure the optimizations never change logic of the job.
cc @ianoc @non