Skip to content Skip to sidebar Skip to footer

Read A Distributed Tab Delimited Csv

Inspired from this question, I wrote some code to store an RDD (which was read from a Parquet file), with a Schema of (photo_id, data), in pairs, delimited by tabs, and just as a d

Solution 1:

Let's try a simple example. For convenience I'll be using handy toolz library but it is not really required here.

import sys
import base64

if sys.version_info < (3, ):
    import cPickle as pickle
else:
    import pickle


from toolz.functoolz import compose

rdd = sc.parallelize([(1, {"foo": "bar"}), (2, {"bar": "foo"})])

Now, your code is not exactly portable right now. In Python 2 base64.b64encode returns str, while in Python 3 it returns bytes. Lets illustrate that:

  • Python 2

    type(base64.b64encode(pickle.dumps({"foo": "bar"})))
    ## str
  • Python 3

    type(base64.b64encode(pickle.dumps({"foo": "bar"})))
    ## bytes

So lets add decoding to the pipeline:

# Equivalent to 
# def pickle_and_b64(x):
#     return base64.b64encode(pickle.dumps(x)).decode("ascii")

pickle_and_b64 = compose(
    lambda x: x.decode("ascii"),
    base64.b64encode,
    pickle.dumps
)

Please note that this doesn't assume any particular shape of the data. Because of that, we'll use mapValues to serialize only keys:

serialized = rdd.mapValues(pickle_and_b64)
serialized.first()
## 1, u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu')

Now we can follow it with simple format and save:

from tempfile import mkdtemp
import os

outdir = os.path.join(mkdtemp(), "foo")

serialized.map(lambda x: "{0}\t{1}".format(*x)).saveAsTextFile(outdir)

To read the file we reverse the process:

# Equivalent to# def  b64_and_unpickle(x):#     return pickle.loads(base64.b64decode(x))

b64_and_unpickle = compose(
    pickle.loads,
    base64.b64decode
)

decoded = (sc.textFile(outdir)
    .map(lambda x: x.split("\t"))  # In Python 3 we could simply use str.split
    .mapValues(b64_and_unpickle))

decoded.first()
## (u'1', {'foo': 'bar'})

Post a Comment for "Read A Distributed Tab Delimited Csv"