scala - How to transform RDD, Dataframe or Dataset straight to a Broadcast variable without collect? -
is there way (or plans) able turn spark distributed collections (rdd
s, dataframe
or dataset
s) directly broadcast
variables without need collect
? public api doesn't seem have "out of box", can done @ lower level?
i can imagine there 2x speedup potential (or more?) these kind of operations. explain mean in detail let's work through example:
val myubermap: broadcast[map[string, string]] = sc.broadcast(mystringpairrdd.collect().tomap) someotherrdd.map(somecodeusingtheubermap)
this causes data collected driver, data broadcasted. means data sent on network twice.
what nice this:
val myubermap: broadcast[map[string, string]] = mystringpairrdd.tobroadcast((a: array[(string, string)]) => a.tomap) someotherrdd.map(somecodeusingtheubermap)
here spark bypass collecting data altogether , move data between nodes.
bonus
furthermore, there monoid-like api (a bit combinebykey
) situations .tomap
or whatever operation on array[t]
expensive, can possibly done in parallel. e.g. constructing trie structures can expensive, kind of functionality result in awesome scope algorithm design. cpu activity can run while io running - while current broadcast mechanism blocking (i.e. io, cpu, io again).
clarification
joining not (main) use case here, can assumed sparsely use broadcasted data structure. example keys in someotherrdd
no means covers keys in myubermap
don't know keys need until traverse someotherrdd
, suppose use myubermap
multiple times.
i know sounds bit vague, point more general machine learning algorithm design.
while theoretically interesting idea argue although theoretically possible has limited practical applications. cannot speak pmc cannot if there plans implement type of broadcasting mechanism @ all.
possible implementation:
since spark provides torrent broadcasting mechanism behavior described follows:
the driver divides serialized object small chunks , stores chunks in
blockmanager
of driver.on each executor, executor first attempts fetch object
blockmanager
. if not exist, uses remote fetches fetch small chunks driver and/or other executors if available.once gets chunks, puts chunks in own
blockmanager
, ready other executors fetch from.
it should possible reuse same mechanism direct node-to-node broadcasting.
it worth noting approach cannot eliminate driver communication. though blocks created locally still need single source of truth advertise set of blocks fetch.
limited applications
one problem broadcast variables there quite expensive. if can eliminate driver bottleneck 2 problems remain:
- memory required store deserialized object on each executor.
- cost of transferring broadcasted data every executor.
the first problem should relatively obvious. not direct memory usage gc cost , effect on overall latency. second 1 rather subtle. partially covered in answer why broadcasthashjoin slower shuffledhashjoin in spark let's discus further.
from network traffic perspective broadcasting whole dataset pretty equivalent creating cartesian product. if dataset large enough driver becoming bottleneck unlikely candidate broadcasting , targeted approach hash join can preferred in practice.
alternatives:
there methods can used achieve similar results direct broadcast , address issues enumerated above including:
- passing data via distributed file system.
- using replicated database collocated worker nodes.
Comments
Post a Comment