scala - Stack overflow error when loading a large table from mongodb to spark -


all, have table 1tb in mongodb. tried load in spark using mongo connector keep getting stack overflow after 18 minutes executing.

java.lang.stackoverflowerror: @ scala.collection.traversablelike$$anonfun$filter$1.apply(traversablelike.scala:264) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.traversablelike$withfilter$$anonfun$foreach$1.apply(traversablelike.scala:772) .... @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.maplike$mappedvalues$$anonfun$foreach$3.apply(maplike.scala:245) @ scala.collection.traversablelike$withfilter$$anonfun$foreach$1.apply(traversablelike.scala:772) 16/06/29 08:42:22 info yarnallocator: driver requested total number of 54692 executor(s). 16/06/29 08:42:22 info yarnallocator: request 46501 executor containers, each 4 cores , 5068 mb memory including 460 mb overhead 

is because didn't provide enough memory ? or should provide more storage? have tried add checkpoint, doesn't help. have changed value in code because relate company database, whole code still ok question.

val sqlcontext = new sqlcontext(sc)  val builder = mongodbconfigbuilder(map(host -> list("mymongodurl:mymongoport"), database -> "mymongoddb", collection ->"mymongocollection", samplingratio -> 0.01, writeconcern -> "normal")) val readconfig = builder.build()  val mongordd = sqlcontext.frommongodb(readconfig) mongordd.registertemptable("mytable")  val dataframe = sqlcontext.sql("select u_at, c_at mytable") val deltacollect = dataframe.filter("u_at not null , c_at not null , u_at != c_at").rdd val mapdelta = deltacollect.map {   case row(u_at: date, c_at: date) =>{     if(u_at.gettime == c_at.gettime){       (0.tostring, 0l)     }     else{       val delta = ( u_at.gettime - c_at.gettime ) / 1000/60/60/24       (delta.tostring, 1l)     }   } } val reduceret = mapdelta.reducebykey(_+_)  val output_path = s"./dump" reduceret.saveastextfile(output_path) 

as know, apache spark in-memory processing while executing job, i.e. loads data worked on memory. here per question , comments, have dataset large 1tb , memory available spark around 8gb per core. hence spark executor out of memory in scenario.

to avoid can follow either of below 2 options:

  1. change rdd storage level memory_and_disk. in way spark not load full data memory; rather try spill data disk. but, way performance decrease because of transactions done between memory , disk. check out rdd persistence
  2. increase core memory spark can load 1tb of data memory. in way performance good, infrastructure cost increase.

Comments

Popular posts from this blog

Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.12:test (default-test) on project.Error occurred in starting fork -

windows - Debug iNetMgr.exe unhandle exception System.Management.Automation.CmdletInvocationException -

configurationsection - activeMq-5.13.3 setup configurations for wildfly 10.0.0 -