List Info

Thread: Re: writing output files in hadoop streaming




Re: writing output files in hadoop streaming
country flaguser name
United States
2008-01-15 14:56:12

Also, this gives you a solution to your race condition (by
using hadoop's
mechanisms) and it also gives you much higher
throughput/reliability/scalability than writing to NFS can
possibly give
you.


On 1/15/08 12:54 PM, "Miles Osborne" <milesinf.ed.ac.uk> wrote:

> surely the clean way (in a streaming environment) would
be to define a
> representation of some kind which serialises the
output.
> 
> http://en.
wikipedia.org/wiki/Serialization
> 
> after your mappers and reducers have completed, you
would then have some
> code which deserialise (unpacked) the output as
desired.   this would easily
> allow you to reconstruct the  two files from a single
(set) of file
> fragments.
> 
> this approach would entail defining the serialisation /
deserialisation
> process in a way which was distinct from the actual
mappers / reducers and
> then having a little compilation process take that 
definition and both
> create the necessary serialisers / deserialisers and
also serve as
> documentation.
> 
> it does have extra overhead, but in the long run it is
worth it, since the
> interfaces are actually documented.
> 
> Miles
> 
> On 15/01/2008, John Heidemann <johnhisi.edu> wrote:
>> 
>> On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning
wrote:
>>> 
>>> Regarding the race condition, hadoop builds
task specific temporary
>>> directories in the output directory, one per
reduce task, that hold these
>>> output files (as long as you don't use absolute
path names).  When the
>>> process completes successfully, the output
files from that temporary
>>> directory are moved to the correct place and
the temporary task-specific
>>> directory is deleted.  If the reduce task dies
or is superceded by
>> another
>>> task, then the directory is simply deleted. 
The file is not kept in
>> memory
>>> pending write.
>>> 
>>> I am curious about how to demarcate the image
boundaries in your current
>>> output.  Hadoop streaming makes the strong
presumption of line
>> orientation.
>>> If that isn't valid for your output, then you
may have a program that is
>>> only accidentally working by finding line
boundaries in binary data.  In
>>> particular, you may someday have a situation
where some of the data has
>> one
>>> kind of line boundary that is recognized, but
on output the corresponding
>>> boundary is generated in a different form.  For
instance, if your program
>>> sees CR-LF, it might take the pair as a line
boundary and emit just LF.
>>> Even if this is not happening now, you may be
in for some trouble
>>> later.
>> 
>> I think Yuri left out a bit about what we're
doing.
>> He wasn't clear about what files we're talking
about writing.
>> Let me try to clarify.
>> 
>> As context, all this is in Hadoop streaming.
>> 
>> Here's one way, the "side-effect way"
(this is what we're doing now):
>> 
>> In principle, we'd like to not ouptut ANYTHING to
stdout from streaming.
>> Instead, we create new files somewhere in the
shared Unix filespace.
>> Basically, these files are side-effects of the
map/reduce computation.
>> 
>> This approach is described in Dean & Ghemawat
section 4.5
>> (Side-effects), with the caveat that the user must
be responsible for
>> making any side-effect atomic.
>> 
>> Our problem is, I think, that duplicated reducers
scheduled for
>> straggler elimination can result in extra, partial
side-effect files.
>> We're trying to figure out how to clean them up
properly.
>> 
>> Currently it seems that prematurely terminated
reducers (due to cancled
>> straggler elimination jobs) are not told they are
terminated.  They just
>> get a SIGPIPE because their write destination goes
away.
>> 
>> This prompted Yuri's first question:
>> 
>>>>>> 1. Is there an easy way to tell in
a script launched by the Hadoop
>>>>>>    streaming, if the script was
terminated before it received
>> complete
>>>>>>    input?
>> 
>> To me, it seems that cancled jobs should get a
SIGTERM or SIGUSR1 so
>> they can catch and cleanup properly.  Otherwise
there seems to be no
>> clean way to distinguish a half-run job from a
fully run job that
>> happens to have less input.  (I.e., no way for our
reducer to do a
>> commit or abort properly.)
>> 
>> (It would be nicer to send an in-band termination
signal down stdin, but
>> I don't think a streaming reducer can do that.)
>> 
>> So what do the Hadoop architects think about
side-effects and recovering
>> from half-run jobs?  Does hadoop intend to support
side-effects (for
>> interested users, obviously not as standard
practice)?  If we were in
>> Java would we get a signal we could use to do
cleanup?
>> 
>> What do that Hadoop streaming people think?  Is
this just a bug that
>> streaming is not propagating a signal that appears
in Javaland?
>> 
>> 
>> 
>> There's a second way, which is where most of the
discussion has gone,
>> call it the "proper" way:
>> 
>> Rather than writing files as side-effects, the
argument is to just
>> output the data with the standard hadoop mechanism.
 In streaming, this
>> means through stdout.
>> 
>> Which prompted Yuri's second question:
>>>>>> 2. Is there any good way to write
multiple HDFS files from a
>> streaming
>>>>>> script
>>>>>>    *and have Hadoop cleanup those
files* when it decides to destroy
>>>>> the
>>>>>>    task?  If there was just one
file, I could simply use STDOUT, but
>>>>>> dumping
>>>>>>    multiple binary files to STDOUT
is not pretty.
>> 
>> But I actually think this is not viable for us,
>> because we're writing images which are binary.
>> As per Doug's comment:
>> 
>>> If that isn't valid for your output, then you
may have a program that is
>>> only accidentally working by finding line
boundaries in binary data.
>> 
>> (Doug, we're not doing it this way right now.)
>> 
>> That said, if it worked, this way is clearly a lot
cleaner, since Hadoop
>> already handles commit/abort for half-run jobs. 
Basically all of our
>> half-run problems go away.  But they're replaced
with File Formatt
>> Problems.
>> 
>> If we were in Java, we could write our own
OutputRecord class.  This is
>> what
>> Runping suggested and Yuri was discussing.  I don't
think that works for
>> us (because we're not in Java, although I suppose
it might be made to
>> work).
>> 
>> If we go that way, then we're basically packing
many files into one.
>> To me it seems to me cleanest, if one wants to do
that, to use some
>> existing format, like tar or zip or cpio, or maybe
the hadoop multi-file
>> format.  But this way seems fraught with peril,
since we have to fight
>> streaming and custom record output, and then still
extract the files
>> after output completes anyway.  Lots and lots of
work---it feels like
>> this can't be right.
>> 
>> (Another one hacky way to make this work in
streaming is to convert binary
>> to
>> ascii, like base-64-ize the files.  Been there in
SQL.  Done that.
>> Don't want to do it again.  It still has all the
encoding and
>> post-processing junk. 
>> 
>> 
>> 
>> Yuri had a very clever hack that merges the two
schemes.  He writes to
>> random filenames as side-effects, but then writes
the side-effect
>> filenames as hadoop output.  Therefore Hadoop
handles commit/abort, and
>> post run he just collects the files that appear in
Hadoop's part-*
>> output and discards the others.
>> 
>> This hack works, but IMHO the reducer should do the
commit/abort of
>> side-effects, not some post-processing job.
>> 
>> 
>> So any thoughts about supporting side-effects?
>> 
>> 
>>    -John
>> 


Re: writing output files in hadoop streaming
country flaguser name
United States
2008-01-16 10:48:50
>On 1/15/08 12:54 PM, "Miles Osborne"
<milesinf.ed.ac.uk> wrote:
>
>> surely the clean way (in a streaming environment)
would be to define a
>> representation of some kind which serialises the
output.
>> 
>> http://en.
wikipedia.org/wiki/Serialization
>> 
>> after your mappers and reducers have completed, you
would then have some
>> code which deserialise (unpacked) the output as
desired.   this would easily
>> allow you to reconstruct the  two files from a
single (set) of file
>> fragments.


On Tue, 15 Jan 2008 12:56:12 PST, Ted Dunning wrote: 
>Also, this gives you a solution to your race condition
(by using hadoop's
>mechanisms) and it also gives you much higher
>throughput/reliability/scalability than writing to NFS
can possibly give
>you.
>

I agree that serializing and using the standard Hadoop
output stream
best leverages the Hadoop mechanisms.  I even labeled it the
"proper"
way, and talked about serialization (without using that
word):

>> On 15/01/2008, John Heidemann <johnhisi.edu> wrote:
>>...
>>> There's a second way, which is where most of
the discussion has gone,
>>> call it the "proper" way:
>>> 
>>> Rather than writing files as side-effects, the
argument is to just
>>> output the data with the standard hadoop
mechanism.  In streaming, this
>>> means through stdout.
>>>...
>>> But I actually think this is not viable for
us,
>>> because we're writing images which are binary.
>>>...
>>> If we go that way, then we're basically packing
many files into one.
>>> To me it seems to me cleanest, if one wants to
do that, to use some
>>> existing format, like tar or zip or cpio, or
maybe the hadoop multi-file
>>> format.  But this way seems fraught with peril,
since we have to fight
>>> streaming and custom record output, and then
still extract the files
>>> after output completes anyway.  Lots and lots
of work---it feels like
>>> this can't be right.
>>> 
>>> (Another one hacky way to make this work in
streaming is to convert binary
>>> to
>>> ascii, like base-64-ize the files.  Been there
in SQL.  Done that.
>>> Don't want to do it again.  It still has all
the encoding and
>>> post-processing junk. 

BUT...I'm suggesting that should not be the ONLY viable
way.

Two reasons:  first, yes, serialization can work.  But
you've put a lot
of layers of junk in the way, all of which has to be done
and undone.
This can easily become a lot of code, and it can easily eat
into any
performance advantage.

On the other hand, if Hadoop would just send a signal to the
aborted
terminated reducer, rather than just closing stdin/stdout,
then a few
lines of signal capture code and a few more to unlink the
temp file does
everything, and a few lines of signal sending code in Hadoop
streaming.
Plus a few on the commit side, and you end up with about 50
lines of
code.  As opposed to serialization, which is hundreds of
lines of stubs,
or large libraries to handle something like tar or zip, plus
potentially
storage overhead (if you convert to base-64 or something),
and storage
overhead because you have to store (at least temporarily)
both
serialized and unserialized versions.

Second, the Google folks found side-effects useful enough
that they
support them, documented them in Dean and Ghemawat, and seem
to use them
internally.  Perhaps Hadoop should consider the costs of
supporting
side-effects before discarding them?


Going back to part of Ted's comment and his performance
objection:

On Tue, 15 Jan 2008 12:56:12 PST, Ted Dunning wrote: 
>Also, this gives you a solution to your race condition
(by using hadoop's
>mechanisms) and it also gives you much higher
>throughput/reliability/scalability than writing to NFS
can possibly give
>you.

About the throughput issues, if you don't want to write to
NFS (we can
at our current cluster size, but I know others are lucker
than us .
If you want, just write side effect files into HDFS to get
all the
throughput/reliability/scalability you would get with
Hadoop's standard
mechanisms.  


To try and clarify what I'm hearing, though, I think the
answer I'm
hearing to my question:

>So what do the Hadoop architects think about
side-effects and recovering
>from half-run jobs?  Does hadoop intend to support
side-effects (for
>interested users, obviously not as standard practice)? 
If we were in
>Java would we get a signal we could use to do cleanup?

Is that Hadoop does NOT current support side-effects,
because people
didn't really consider it.

And there's some push-back against side-effects as being not
very
clean.  (Which I agree with to a first order, but not
strongly enough
that I think it should be forbidden.)

Are folks anti-side-effect so much that if we submit the
10-line signal
sending patch to streaming it will be given a -1? 
(Footnote: it's a
10-line C patch, I have to confirm what it looks like in
Java.)

   -John Heidemann


[1-2]

about | contact  Other archives ( Real Estate discussion Medical topics )