scala - Stack overflow error when loading a large table from mongodb to spark -
all, have table 1tb in mongodb. tried load in spark using mongo connector keep getting stack overflow after 18 minutes executing.
java.lang.stackoverflowerror: @ scala.collection.traversablelike$$anonfun$filter$1.apply(traversablelike.scala:264) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.traversablelike$withfilter$$anonfun$foreach$1.apply(traversablelike.scala:772) .... @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.traversablelike$withfilter$$anonfun$foreach$1.apply(traversablelike.scala:772) 16/06/29 08:42:22 info yarnallocator: driver requested total number of 54692 executor(s). 16/06/29 08:42:22 info yarnallocator: request 46501 executor containers, each 4 cores , 5068 mb memory including 460 mb overhead
is because didn't provide enough memory ? or should provide more storage? have tried add checkpoint, doesn't help. have changed value in code because relate company database, whole code still ok question.
val sqlcontext = new sqlcontext(sc) val builder = mongodbconfigbuilder(map(host -> list("mymongodurl:mymongoport"), database -> "mymongoddb", collection ->"mymongocollection", samplingratio -> 0.01, writeconcern -> "normal")) val readconfig = builder.build() val mongordd = sqlcontext.frommongodb(readconfig) mongordd.registertemptable("mytable") val dataframe = sqlcontext.sql("select u_at, c_at mytable") val deltacollect = dataframe.filter("u_at not null , c_at not null , u_at != c_at").rdd val mapdelta = deltacollect.map { case row(u_at: date, c_at: date) =>{ if(u_at.gettime == c_at.gettime){ (0.tostring, 0l) } else{ val delta = ( u_at.gettime - c_at.gettime ) / 1000/60/60/24 (delta.tostring, 1l) } } } val reduceret = mapdelta.reducebykey(_+_) val output_path = s"./dump" reduceret.saveastextfile(output_path)
as know, apache spark in-memory processing while executing job, i.e. loads data worked on memory. here per question , comments, have dataset large 1tb , memory available spark around 8gb per core. hence spark executor out of memory in scenario.
to avoid can follow either of below 2 options:
- change rdd
storage level
memory_and_disk
. in way spark not load full data memory; rather try spill data disk. but, way performance decrease because of transactions done between memory , disk. check out rdd persistence - increase core memory spark can load 1tb of data memory. in way performance good, infrastructure cost increase.
Comments
Post a Comment