scala - Stack overflow error when loading a large table from mongodb to spark -


all, have table 1tb in mongodb. tried load in spark using mongo connector keep getting stack overflow after 18 minutes executing.

java.lang.stackoverflowerror: @ scala.collection.traversablelike$$anonfun$filter$1.apply(traversablelike.scala:264) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.traversablelike$withfilter$$anonfun$foreach$1.apply(traversablelike.scala:772) .... @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.traversablelike$withfilter$$anonfun$foreach$1.apply(traversablelike.scala:772) 16/06/29 08:42:22 info yarnallocator: driver requested total number of 54692 executor(s). 16/06/29 08:42:22 info yarnallocator: request 46501 executor containers, each 4 cores , 5068 mb memory including 460 mb overhead 

is because didn't provide enough memory ? or should provide more storage? have tried add checkpoint, doesn't help. have changed value in code because relate company database, whole code still ok question.

val sqlcontext = new sqlcontext(sc)  val builder = mongodbconfigbuilder(map(host -> list("mymongodurl:mymongoport"), database -> "mymongoddb", collection ->"mymongocollection", samplingratio -> 0.01, writeconcern -> "normal")) val readconfig = builder.build()  val mongordd = sqlcontext.frommongodb(readconfig) mongordd.registertemptable("mytable")  val dataframe = sqlcontext.sql("select u_at, c_at mytable") val deltacollect = dataframe.filter("u_at not null , c_at not null , u_at != c_at").rdd val mapdelta = deltacollect.map {   case row(u_at: date, c_at: date) =>{     if(u_at.gettime == c_at.gettime){       (0.tostring, 0l)     }     else{       val delta = ( u_at.gettime - c_at.gettime ) / 1000/60/60/24       (delta.tostring, 1l)     }   } } val reduceret = mapdelta.reducebykey(_+_)  val output_path = s"./dump" reduceret.saveastextfile(output_path) 

as know, apache spark in-memory processing while executing job, i.e. loads data worked on memory. here per question , comments, have dataset large 1tb , memory available spark around 8gb per core. hence spark executor out of memory in scenario.

to avoid can follow either of below 2 options:

  1. change rdd storage level memory_and_disk. in way spark not load full data memory; rather try spill data disk. but, way performance decrease because of transactions done between memory , disk. check out rdd persistence
  2. increase core memory spark can load 1tb of data memory. in way performance good, infrastructure cost increase.

Comments

Popular posts from this blog

iis - ASP.Net Core CreatedAtAction in HttpPost action returns 201 but entire request ends with 500 -

Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.12:test (default-test) on project.Error occurred in starting fork -