scala - Hdfs Snappy corrupted data. Could not decompress data. Input is invalid. When does this occur and can it be prevented? -
i writing data hdfs following code:
def createorappendhadoopsnappy(path: path, hdfs: filesystem): compressionoutputstream = { val compressioncodecfactory = new compressioncodecfactory(hdfs.getconf) val snappycodec = compressioncodecfactory.getcodecbyclassname(classof[org.apache.hadoop.io.compress.snappycodec].getname) snappycodec.createoutputstream(createorappend(path, hdfs)) } def createorappend(path: path, hdfs: filesystem): fsdataoutputstream = { if (hdfs.exists(path)) { hdfs.append(path) } else { hdfs.create(path) } }
and code calling function roughly:
... val outputstream = new bufferedoutputstream(hdfsutils.createorappendhadoopsnappy(filepath, filesystem)) ... for(managedoutputstream <- managed(outputstream)) { ioutils.writelines(lines.asjavacollection, "\n", managedoutputstream, "utf-8") } ...
now @ 1 point have had few files corrupted following message when read them 'hadoop fs -text': java.lang.internalerror: not decompress data. input invalid.
note code runs spark on yarn , due changes in code job killed yarn once , once manually killed spark job in test later.
now wanted try reproduce scenario corrupt files generated, far did not succeed. tried different things writing interrupted (with system.exit(0), exception, manual ctrl-c). file not written, not giving java interalerror exception.
does know in cases corrupted files , happen , how/if can prevented?
Comments
Post a Comment