List Info

Thread: Re: writing output files in hadoop streaming




Re: writing output files in hadoop streaming
country flaguser name
United States
2008-01-15 11:09:07
Regarding the race condition, hadoop builds task specific
temporary
directories in the output directory, one per reduce task,
that hold these
output files (as long as you don't use absolute path names).
 When the
process completes successfully, the output files from that
temporary
directory are moved to the correct place and the temporary
task-specific
directory is deleted.  If the reduce task dies or is
superceded by another
task, then the directory is simply deleted.  The file is not
kept in memory
pending write.

I am curious about how to demarcate the image boundaries in
your current
output.  Hadoop streaming makes the strong presumption of
line orientation.
If that isn't valid for your output, then you may have a
program that is
only accidentally working by finding line boundaries in
binary data.  In
particular, you may someday have a situation where some of
the data has one
kind of line boundary that is recognized, but on output the
corresponding
boundary is generated in a different form.  For instance, if
your program
sees CR-LF, it might take the pair as a line boundary and
emit just LF.
Even if this is not happening now, you may be in for some
trouble later.


On 1/15/08 8:57 AM, "Yuri Pradkin" <yuriISI.EDU> wrote:

> Well, in our case the reducer munches key-value pairs
to
> generate images; it's conceivable that we'll have
other
> reducers in the future to do other interesting things.
> So, it would be impractical to move all that code into
> a RecordWriter.  We don't want to have a new 
RecordWriter
> for each new job, and we'd like to keep our processing
> code in languages other than Java, which is the only
reason
> to use streaming, right?
> 
> 
> Do you think it would be a good solution to come up
with a
> "generic" version of a record writer that
would take as
> input, say: 
> <filename, filesize, rawbytes[filesize]>
> and do the actual writing?
> 
> Will the Hadoop guarantee that only one
"filename" will be
> created/written to even if there are racing tasks and
the
> file will not be corrupted?
> 
> And what about memory requirements? -- When filesize is
large,
> would it have to be all stored in memory before it's
written,
> or Hadoop will cache it in a temp file?
> 
> Thanks much for your input.
> 
>   -Yuri 
> 
> On Mon, Jan 14, 2008 at 01:06:13PM -0800, Runping Qi
wrote:
>> 
>> One way to achieve your goal is to implement your
own
>> OutputFormat/RecordWriter classes.
>> Your reducer will emit all the key/value pairs as
in the normal case.
>> In your record writer class can open multiple
output files and dispatch
>> the key/value to appropriate files based on the
actual values.
>> This way, the Hadoop framework takes care of all
the issues related the
>> namespace and the necessary cleanup of the output
files.
>> 
>> 
>> Runping
>>  
>> 
>>> -----Original Message-----
>>> From: Yuri Pradkin [mailto:yuriISI.EDU]
>>> Sent: Monday, January 14, 2008 12:33 PM
>>> To: hadoop-userlucene.apache.org
>>> Subject: writing output files in hadoop
streaming
>>> 
>>> Hi,
>>> 
>>> We've been using Hadoop streaming for the last
3-4 months and
>>> it all worked out fine except for one little
problem:
>>> 
>>> in some situations a hadoop reduce job gets
multiple key groups
>>> and is desired to write out a separate binary
output file for
>>> each group.  However, when a reduce task takes
too long and
>>> there is spare capacity, the task may be
replicated on another
>>> node and these two are basically racing each
other.  One finishes
>>> cleanly and the other is terminated.  Hadoop
takes care to remove
>>> ther terminated job's output from HDFS, but
since we're writing
>>> files from scripts, it's up to us to separate
the output of cleanly
>>> finished tasks from the output of tasks that
are terminated
>>> prematurely.
>>> 
>>> Does somebody have answers to the following
questions:
>>> 1. Is there an easy way to tell in a script
launched by the Hadoop
>>>    streaming, if the script was terminated
before it received complete
>>>    input?
>>>    As far as I was able to ascertain, no
signals are being sent to
>> those
>>>    unix-jobs.  They just stop receiving data
from STDIN.  The only way
>>>    that seems to work for me was to process all
input and then write
>>>    something to STDOUT/STDERR and see if that
causes a SIGPIPE.  But
>>>    this is ugly, I hope there is a better
solution.
>>> 
>>> 2. Is there any good way to write multiple HDFS
files from a streaming
>>> script
>>>    *and have Hadoop cleanup those files* when
it decides to destroy
>> the
>>>    task?  If there was just one file, I could
simply use STDOUT, but
>>> dumping
>>>    multiple binary files to STDOUT is not
pretty.
>>> 
>>> We are writing output files to an NFS partition
shared among all
>> reducers,
>>> which
>>> makes it all slightly more complicated because
of possible file
>>> overwrites.
>>> 
>>> Our current solution, which is not pretty but
avoids directly
>> addressing
>>> this
>>> problem is to write out files with random names
(created with mktemp)
>> and
>>> write
>>> to STDOUT the renaming command for this file to
it's desired name.
>> Then
>>> as a
>>> post-processing stage, I execute all those
commands and delete the
>>> remaining
>>> temporary files as duplicates/incompletes.
>>> 
>>> Thanks,
>>> 
>>>   -Yuri


Re: writing output files in hadoop streaming
country flaguser name
United States
2008-01-15 11:37:49
On Tue, Jan 15, 2008 at 09:09:07AM -0800, Ted Dunning
wrote:
> 
> Regarding the race condition, hadoop builds task
specific temporary
> directories in the output directory, one per reduce
task, that hold these
> output files (as long as you don't use absolute path
names).  When the
> process completes successfully, the output files from
that temporary
> directory are moved to the correct place and the
temporary task-specific
> directory is deleted.  If the reduce task dies or is
superceded by another
> task, then the directory is simply deleted.  The file
is not kept in memory
> pending write.

That sounds like a "generic" record writer I
sketched might work.  I'd love
to hear you comment on it.

Are there any e.g. perl bindings to make our scripts write
to files in those
temp directories (on HDFS)?  That could be another solution
to our problem.

> I am curious about how to demarcate the image
boundaries in your current
> output.  Hadoop streaming makes the strong presumption
of line orientation.
> If that isn't valid for your output, then you may have
a program that is
> only accidentally working by finding line boundaries in
binary data.  In
> particular, you may someday have a situation where some
of the data has one
> kind of line boundary that is recognized, but on output
the corresponding
> boundary is generated in a different form.  For
instance, if your program
> sees CR-LF, it might take the pair as a line boundary
and emit just LF.
> Even if this is not happening now, you may be in for
some trouble later.

Currently we are not using any image boundaries.  Our
current reducer bunches
up all records with the same key and feeds it to an image
generation program
that writes to a unique file name on NFS  (I described it in
the previous post).
Each image is in a separate file.  Image boundary is the key
boundary.

The "generic" record writer for multiple files
would take a file size as an
argument, so it can know how long the rawbytes field is.

Thanks!

  -Yuri

> 
> 
> On 1/15/08 8:57 AM, "Yuri Pradkin"
<yuriISI.EDU> wrote:
> 
> > Well, in our case the reducer munches key-value
pairs to
> > generate images; it's conceivable that we'll have
other
> > reducers in the future to do other interesting
things.
> > So, it would be impractical to move all that code
into
> > a RecordWriter.  We don't want to have a new 
RecordWriter
> > for each new job, and we'd like to keep our
processing
> > code in languages other than Java, which is the
only reason
> > to use streaming, right?
> > 
> > 
> > Do you think it would be a good solution to come
up with a
> > "generic" version of a record writer
that would take as
> > input, say: 
> > <filename, filesize, rawbytes[filesize]>
> > and do the actual writing?
> > 
> > Will the Hadoop guarantee that only one
"filename" will be
> > created/written to even if there are racing tasks
and the
> > file will not be corrupted?
> > 
> > And what about memory requirements? -- When
filesize is large,
> > would it have to be all stored in memory before
it's written,
> > or Hadoop will cache it in a temp file?
> > 
> > Thanks much for your input.
> > 
> >   -Yuri 
> > 
> > On Mon, Jan 14, 2008 at 01:06:13PM -0800, Runping
Qi wrote:
> >> 
> >> One way to achieve your goal is to implement
your own
> >> OutputFormat/RecordWriter classes.
> >> Your reducer will emit all the key/value pairs
as in the normal case.
> >> In your record writer class can open multiple
output files and dispatch
> >> the key/value to appropriate files based on
the actual values.
> >> This way, the Hadoop framework takes care of
all the issues related the
> >> namespace and the necessary cleanup of the
output files.
> >> 
> >> 
> >> Runping
> >>  
> >> 
> >>> -----Original Message-----
> >>> From: Yuri Pradkin [mailto:yuriISI.EDU]
> >>> Sent: Monday, January 14, 2008 12:33 PM
> >>> To: hadoop-userlucene.apache.org
> >>> Subject: writing output files in hadoop
streaming
> >>> 
> >>> Hi,
> >>> 
> >>> We've been using Hadoop streaming for the
last 3-4 months and
> >>> it all worked out fine except for one
little problem:
> >>> 
> >>> in some situations a hadoop reduce job
gets multiple key groups
> >>> and is desired to write out a separate
binary output file for
> >>> each group.  However, when a reduce task
takes too long and
> >>> there is spare capacity, the task may be
replicated on another
> >>> node and these two are basically racing
each other.  One finishes
> >>> cleanly and the other is terminated. 
Hadoop takes care to remove
> >>> ther terminated job's output from HDFS,
but since we're writing
> >>> files from scripts, it's up to us to
separate the output of cleanly
> >>> finished tasks from the output of tasks
that are terminated
> >>> prematurely.
> >>> 
> >>> Does somebody have answers to the
following questions:
> >>> 1. Is there an easy way to tell in a
script launched by the Hadoop
> >>>    streaming, if the script was terminated
before it received complete
> >>>    input?
> >>>    As far as I was able to ascertain, no
signals are being sent to
> >> those
> >>>    unix-jobs.  They just stop receiving
data from STDIN.  The only way
> >>>    that seems to work for me was to
process all input and then write
> >>>    something to STDOUT/STDERR and see if
that causes a SIGPIPE.  But
> >>>    this is ugly, I hope there is a better
solution.
> >>> 
> >>> 2. Is there any good way to write multiple
HDFS files from a streaming
> >>> script
> >>>    *and have Hadoop cleanup those files*
when it decides to destroy
> >> the
> >>>    task?  If there was just one file, I
could simply use STDOUT, but
> >>> dumping
> >>>    multiple binary files to STDOUT is not
pretty.
> >>> 
> >>> We are writing output files to an NFS
partition shared among all
> >> reducers,
> >>> which
> >>> makes it all slightly more complicated
because of possible file
> >>> overwrites.
> >>> 
> >>> Our current solution, which is not pretty
but avoids directly
> >> addressing
> >>> this
> >>> problem is to write out files with random
names (created with mktemp)
> >> and
> >>> write
> >>> to STDOUT the renaming command for this
file to it's desired name.
> >> Then
> >>> as a
> >>> post-processing stage, I execute all those
commands and delete the
> >>> remaining
> >>> temporary files as
duplicates/incompletes.
> >>> 
> >>> Thanks,
> >>> 
> >>>   -Yuri

Re: writing output files in hadoop streaming
country flaguser name
United States
2008-01-15 14:22:11
On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning wrote: 
>
>Regarding the race condition, hadoop builds task
specific temporary
>directories in the output directory, one per reduce
task, that hold these
>output files (as long as you don't use absolute path
names).  When the
>process completes successfully, the output files from
that temporary
>directory are moved to the correct place and the
temporary task-specific
>directory is deleted.  If the reduce task dies or is
superceded by another
>task, then the directory is simply deleted.  The file is
not kept in memory
>pending write.
>
>I am curious about how to demarcate the image boundaries
in your current
>output.  Hadoop streaming makes the strong presumption
of line orientation.
>If that isn't valid for your output, then you may have a
program that is
>only accidentally working by finding line boundaries in
binary data.  In
>particular, you may someday have a situation where some
of the data has one
>kind of line boundary that is recognized, but on output
the corresponding
>boundary is generated in a different form.  For
instance, if your program
>sees CR-LF, it might take the pair as a line boundary
and emit just LF.
>Even if this is not happening now, you may be in for
some trouble
>later.

I think Yuri left out a bit about what we're doing.
He wasn't clear about what files we're talking about
writing.
Let me try to clarify.

As context, all this is in Hadoop streaming.

Here's one way, the "side-effect way" (this is
what we're doing now):

In principle, we'd like to not ouptut ANYTHING to stdout
from streaming.
Instead, we create new files somewhere in the shared Unix
filespace.
Basically, these files are side-effects of the map/reduce
computation.

This approach is described in Dean & Ghemawat section
4.5
(Side-effects), with the caveat that the user must be
responsible for
making any side-effect atomic.

Our problem is, I think, that duplicated reducers scheduled
for
straggler elimination can result in extra, partial
side-effect files.
We're trying to figure out how to clean them up properly.

Currently it seems that prematurely terminated reducers (due
to cancled
straggler elimination jobs) are not told they are
terminated.  They just
get a SIGPIPE because their write destination goes away.

This prompted Yuri's first question:

>>>> 1. Is there an easy way to tell in a script
launched by the Hadoop
>>>>    streaming, if the script was terminated
before it received complete
>>>>    input?

To me, it seems that cancled jobs should get a SIGTERM or
SIGUSR1 so
they can catch and cleanup properly.  Otherwise there seems
to be no
clean way to distinguish a half-run job from a fully run job
that
happens to have less input.  (I.e., no way for our reducer
to do a
commit or abort properly.)

(It would be nicer to send an in-band termination signal
down stdin, but
I don't think a streaming reducer can do that.)

So what do the Hadoop architects think about side-effects
and recovering
from half-run jobs?  Does hadoop intend to support
side-effects (for
interested users, obviously not as standard practice)?  If
we were in
Java would we get a signal we could use to do cleanup?

What do that Hadoop streaming people think?  Is this just a
bug that
streaming is not propagating a signal that appears in
Javaland?



There's a second way, which is where most of the discussion
has gone,
call it the "proper" way:

Rather than writing files as side-effects, the argument is
to just
output the data with the standard hadoop mechanism.  In
streaming, this
means through stdout.

Which prompted Yuri's second question:
>>>> 2. Is there any good way to write multiple
HDFS files from a streaming
>>>> script
>>>>    *and have Hadoop cleanup those files*
when it decides to destroy
>>> the
>>>>    task?  If there was just one file, I
could simply use STDOUT, but
>>>> dumping
>>>>    multiple binary files to STDOUT is not
pretty.

But I actually think this is not viable for us,
because we're writing images which are binary.
As per Doug's comment:

>If that isn't valid for your output, then you may have a
program that is
>only accidentally working by finding line boundaries in
binary data. 

(Doug, we're not doing it this way right now.)

That said, if it worked, this way is clearly a lot cleaner,
since Hadoop
already handles commit/abort for half-run jobs.  Basically
all of our
half-run problems go away.  But they're replaced with File
Formatt
Problems.

If we were in Java, we could write our own OutputRecord
class.  This is what
Runping suggested and Yuri was discussing.  I don't think
that works for
us (because we're not in Java, although I suppose it might
be made to
work).

If we go that way, then we're basically packing many files
into one.
To me it seems to me cleanest, if one wants to do that, to
use some
existing format, like tar or zip or cpio, or maybe the
hadoop multi-file
format.  But this way seems fraught with peril, since we
have to fight
streaming and custom record output, and then still extract
the files
after output completes anyway.  Lots and lots of work---it
feels like
this can't be right.

(Another one hacky way to make this work in streaming is to
convert binary to
ascii, like base-64-ize the files.  Been there in SQL.  Done
that.
Don't want to do it again.  It still has all the encoding
and
post-processing junk. 


	
Yuri had a very clever hack that merges the two schemes.  He
writes to
random filenames as side-effects, but then writes the
side-effect
filenames as hadoop output.  Therefore Hadoop handles
commit/abort, and
post run he just collects the files that appear in Hadoop's
part-*
output and discards the others.

This hack works, but IMHO the reducer should do the
commit/abort of
side-effects, not some post-processing job.


So any thoughts about supporting side-effects?


   -John

Re: writing output files in hadoop streaming
user name
2008-01-15 14:54:14
surely the clean way (in a streaming environment) would be
to define a
representation of some kind which serialises the output.

http://en.
wikipedia.org/wiki/Serialization

after your mappers and reducers have completed, you would
then have some
code which deserialise (unpacked) the output as desired.  
this would easily
allow you to reconstruct the  two files from a single (set)
of file
fragments.

this approach would entail defining the serialisation /
deserialisation
process in a way which was distinct from the actual mappers
/ reducers and
then having a little compilation process take that 
definition and both
create the necessary serialisers / deserialisers and also
serve as
documentation.

it does have extra overhead, but in the long run it is worth
it, since the
interfaces are actually documented.

Miles

On 15/01/2008, John Heidemann <johnhisi.edu> wrote:
>
> On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning wrote:
> >
> >Regarding the race condition, hadoop builds task
specific temporary
> >directories in the output directory, one per reduce
task, that hold these
> >output files (as long as you don't use absolute
path names).  When the
> >process completes successfully, the output files
from that temporary
> >directory are moved to the correct place and the
temporary task-specific
> >directory is deleted.  If the reduce task dies or
is superceded by
> another
> >task, then the directory is simply deleted.  The
file is not kept in
> memory
> >pending write.
> >
> >I am curious about how to demarcate the image
boundaries in your current
> >output.  Hadoop streaming makes the strong
presumption of line
> orientation.
> >If that isn't valid for your output, then you may
have a program that is
> >only accidentally working by finding line
boundaries in binary data.  In
> >particular, you may someday have a situation where
some of the data has
> one
> >kind of line boundary that is recognized, but on
output the corresponding
> >boundary is generated in a different form.  For
instance, if your program
> >sees CR-LF, it might take the pair as a line
boundary and emit just LF.
> >Even if this is not happening now, you may be in
for some trouble
> >later.
>
> I think Yuri left out a bit about what we're doing.
> He wasn't clear about what files we're talking about
writing.
> Let me try to clarify.
>
> As context, all this is in Hadoop streaming.
>
> Here's one way, the "side-effect way" (this
is what we're doing now):
>
> In principle, we'd like to not ouptut ANYTHING to
stdout from streaming.
> Instead, we create new files somewhere in the shared
Unix filespace.
> Basically, these files are side-effects of the
map/reduce computation.
>
> This approach is described in Dean & Ghemawat
section 4.5
> (Side-effects), with the caveat that the user must be
responsible for
> making any side-effect atomic.
>
> Our problem is, I think, that duplicated reducers
scheduled for
> straggler elimination can result in extra, partial
side-effect files.
> We're trying to figure out how to clean them up
properly.
>
> Currently it seems that prematurely terminated reducers
(due to cancled
> straggler elimination jobs) are not told they are
terminated.  They just
> get a SIGPIPE because their write destination goes
away.
>
> This prompted Yuri's first question:
>
> >>>> 1. Is there an easy way to tell in a
script launched by the Hadoop
> >>>>    streaming, if the script was
terminated before it received
> complete
> >>>>    input?
>
> To me, it seems that cancled jobs should get a SIGTERM
or SIGUSR1 so
> they can catch and cleanup properly.  Otherwise there
seems to be no
> clean way to distinguish a half-run job from a fully
run job that
> happens to have less input.  (I.e., no way for our
reducer to do a
> commit or abort properly.)
>
> (It would be nicer to send an in-band termination
signal down stdin, but
> I don't think a streaming reducer can do that.)
>
> So what do the Hadoop architects think about
side-effects and recovering
> from half-run jobs?  Does hadoop intend to support
side-effects (for
> interested users, obviously not as standard practice)? 
If we were in
> Java would we get a signal we could use to do cleanup?
>
> What do that Hadoop streaming people think?  Is this
just a bug that
> streaming is not propagating a signal that appears in
Javaland?
>
>
>
> There's a second way, which is where most of the
discussion has gone,
> call it the "proper" way:
>
> Rather than writing files as side-effects, the argument
is to just
> output the data with the standard hadoop mechanism.  In
streaming, this
> means through stdout.
>
> Which prompted Yuri's second question:
> >>>> 2. Is there any good way to write
multiple HDFS files from a
> streaming
> >>>> script
> >>>>    *and have Hadoop cleanup those
files* when it decides to destroy
> >>> the
> >>>>    task?  If there was just one file,
I could simply use STDOUT, but
> >>>> dumping
> >>>>    multiple binary files to STDOUT is
not pretty.
>
> But I actually think this is not viable for us,
> because we're writing images which are binary.
> As per Doug's comment:
>
> >If that isn't valid for your output, then you may
have a program that is
> >only accidentally working by finding line
boundaries in binary data.
>
> (Doug, we're not doing it this way right now.)
>
> That said, if it worked, this way is clearly a lot
cleaner, since Hadoop
> already handles commit/abort for half-run jobs. 
Basically all of our
> half-run problems go away.  But they're replaced with
File Formatt
> Problems.
>
> If we were in Java, we could write our own OutputRecord
class.  This is
> what
> Runping suggested and Yuri was discussing.  I don't
think that works for
> us (because we're not in Java, although I suppose it
might be made to
> work).
>
> If we go that way, then we're basically packing many
files into one.
> To me it seems to me cleanest, if one wants to do that,
to use some
> existing format, like tar or zip or cpio, or maybe the
hadoop multi-file
> format.  But this way seems fraught with peril, since
we have to fight
> streaming and custom record output, and then still
extract the files
> after output completes anyway.  Lots and lots of
work---it feels like
> this can't be right.
>
> (Another one hacky way to make this work in streaming
is to convert binary
> to
> ascii, like base-64-ize the files.  Been there in SQL. 
Done that.
> Don't want to do it again.  It still has all the
encoding and
> post-processing junk. 
>
>
>
> Yuri had a very clever hack that merges the two
schemes.  He writes to
> random filenames as side-effects, but then writes the
side-effect
> filenames as hadoop output.  Therefore Hadoop handles
commit/abort, and
> post run he just collects the files that appear in
Hadoop's part-*
> output and discards the others.
>
> This hack works, but IMHO the reducer should do the
commit/abort of
> side-effects, not some post-processing job.
>
>
> So any thoughts about supporting side-effects?
>
>
>    -John
>
[1-4]

about | contact  Other archives ( Real Estate discussion Medical topics )