scala - How to transform RDD, Dataframe or Dataset straight to a Broadcast variable without collect? -


is there way (or plans) able turn spark distributed collections (rdds, dataframe or datasets) directly broadcast variables without need collect? public api doesn't seem have "out of box", can done @ lower level?

i can imagine there 2x speedup potential (or more?) these kind of operations. explain mean in detail let's work through example:

val myubermap: broadcast[map[string, string]] =   sc.broadcast(mystringpairrdd.collect().tomap)  someotherrdd.map(somecodeusingtheubermap) 

this causes data collected driver, data broadcasted. means data sent on network twice.

what nice this:

val myubermap: broadcast[map[string, string]] =   mystringpairrdd.tobroadcast((a: array[(string, string)]) => a.tomap)  someotherrdd.map(somecodeusingtheubermap) 

here spark bypass collecting data altogether , move data between nodes.

bonus

furthermore, there monoid-like api (a bit combinebykey) situations .tomap or whatever operation on array[t] expensive, can possibly done in parallel. e.g. constructing trie structures can expensive, kind of functionality result in awesome scope algorithm design. cpu activity can run while io running - while current broadcast mechanism blocking (i.e. io, cpu, io again).

clarification

joining not (main) use case here, can assumed sparsely use broadcasted data structure. example keys in someotherrdd no means covers keys in myubermap don't know keys need until traverse someotherrdd , suppose use myubermap multiple times.

i know sounds bit vague, point more general machine learning algorithm design.

while theoretically interesting idea argue although theoretically possible has limited practical applications. cannot speak pmc cannot if there plans implement type of broadcasting mechanism @ all.

possible implementation:

since spark provides torrent broadcasting mechanism behavior described follows:

the driver divides serialized object small chunks , stores chunks in blockmanager of driver.

on each executor, executor first attempts fetch object blockmanager. if not exist, uses remote fetches fetch small chunks driver and/or other executors if available.

once gets chunks, puts chunks in own blockmanager, ready other executors fetch from.

it should possible reuse same mechanism direct node-to-node broadcasting.

it worth noting approach cannot eliminate driver communication. though blocks created locally still need single source of truth advertise set of blocks fetch.

limited applications

one problem broadcast variables there quite expensive. if can eliminate driver bottleneck 2 problems remain:

  • memory required store deserialized object on each executor.
  • cost of transferring broadcasted data every executor.

the first problem should relatively obvious. not direct memory usage gc cost , effect on overall latency. second 1 rather subtle. partially covered in answer why broadcasthashjoin slower shuffledhashjoin in spark let's discus further.

from network traffic perspective broadcasting whole dataset pretty equivalent creating cartesian product. if dataset large enough driver becoming bottleneck unlikely candidate broadcasting , targeted approach hash join can preferred in practice.

alternatives:

there methods can used achieve similar results direct broadcast , address issues enumerated above including:

  • passing data via distributed file system.
  • using replicated database collocated worker nodes.

Comments

Popular posts from this blog

Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.12:test (default-test) on project.Error occurred in starting fork -

windows - Debug iNetMgr.exe unhandle exception System.Management.Automation.CmdletInvocationException -

configurationsection - activeMq-5.13.3 setup configurations for wildfly 10.0.0 -