D. Metrics

We now give an overview of how each performance characteristic was measured.

Memory Footprint

Failure due to underprovisioned resources is deadly in production data science. We aim for predictable memory usage so we can prevent crashes of this kind in most cases. There are many ways to measure memory usage. Total bytes consumed by calls to malloc() or bytes occupied by Python objects is irrelevant to the OS. Peak memory usage gives more operational insight.

We use the following two functions to profile memory for each benchmark. They were adapted from Fabian Pedregosa's excellent blog post on measuring memory usage in Python:

def memory_usage_psutil(process_name):
    # return the memory usage in MB
    import psutil
    mem = 0
    pids = get_pids(process_name)
    for pid in pids:
        process = psutil.Process(pid)
        pmem = process.memory_info()[0] / float(2 ** 20)
        mem += pmem
    return mem

def memory_usage_resource():
    import resource
    rusage_denom = 1024.
    if sys.platform == 'darwin':
        rusage_denom = rusage_denom * rusage_denom
    mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / rusage_denom
    return mem

The first function, memory_usage_psutil finds all instances of a process of a given command name and sums the peak RSS. The second function, memory_usage_resource returns the peak RSS for the current python process.

All instances of java belonged to Spark during the Spark benchmarks. The memory footprint of Spark includes both the local Python process of the PySpark master and java.

total_pyspark_mem_used = memory_usage_resource() + memory_usage_psutil("java")

This returns the total memory used in megabytes.

In R, we must call ps on the calling process's pid to measure memory usage in pages. We then convert to bytes:

memory_usage <- function(){
 return(strtoi(system(paste("ps -o rss ", Sys.getpid(), "| tail -1"), intern=TRUE))*1024)
}

Filesystem and Disk Caches

The Linux OS is very aggressive about caching previous reads. The filesystem cache persists even when a process dies. IPython's timeit magic function is tempting:

%timeit pandas.read_csv("filename.csv")

but the numbers are not very insightful for cold read performance. There may be no significant difference in performance between drastically different storage systems after the first read. The plot in the Machine Setup section clearly shows RAID-0 (2 ephemeral SSDs) and EBS have the same throughput on the second and subsequent reads. However, when the cache is cleared, RAID-0 (2 SSDs)'s throughput is 6x of EBS. The throughput of RAID-0 with 8 instance SSDs is 25x of EBS. Again, these differences may vanish on the second read.

Clearing the Filesystem Cache

The following bash commands are used to ensure All writes must be synced and the caches must be flushed before running any benchmark. The clear_cache.sh bash script can be used for this purpose.

clear_cache.sh
#!/usr/bin/env bash
#
# Ensure all writes are synced to disk.
sudo bash -c "sync || sync || sync || sync"
# Clear the caches
sudo bash -c "echo 3 > /proc/sys/vm/drop_caches"

The ./clear_cache.sh command is executed in between cold benchmarks and before the first trial for warm benchmarks. In the warm case, first trial is always discarded and additional trial is performed.

Writes faster than reads: not so fast!

These benchmarks do not address the performance of file writers, but timing writes is worthy of some discussion. The IPython %time magic function is tempting to time how long it takes to write a file:

%time my_write_function("myfile")

However, it is also unreliable because the writes may still be cached in memory after the function returns. Moreover, timing a read on the file that was just written also gives a bad estimate of cold read performance. Some of the file contents may actually be read from memory. It is also a bad estimate of warm read performance because the OS may be flushing the write cache at the same time it reads from the file.

In the example below, it seemingly took 20 seconds to copy 45 GB of files to a general-purpose EBS volume.

$ sync
$ time cp /raid0-2/*.csv /public
real    0m20.492s
user    0m0.001s
sys     0m0.002s

A write throughput of 2.25 GB/s well exceeds the bandwidth of 250 MB/s. The sync command ensures all cached writes are flushed disk.

$ time sync

real    3m50.049s
user    0m0.001s
sys     0m0.003s

Unsurprisingly, it takes about 3 minutes and 50 seconds to complete. A throughput estimate of 195 MB/s is more reasonable. However, some writes may be untimed. Timing write and sync together gives a more reliable estimate:

os.system("sync")
tic = time()
my_write_function("mytime")
os.system("sync")
toc = time()

Tasks

We tested each method on three different tasks:

  1. load: loading a CSV file or binary file.
  2. sum: summing the data loaded. For text columns, we take the sum of the columns.
  3. copy: copy the loaded data from Spark/Dato/ParaText's internal representation to Python.

Task #1: Loading the Data

The code used to load a CSV or binary file for each method is shown in this section. In our benchmarks, the loading step is timed separately from the transfer step so we can consider their relative overheads independently.

Wise.io ParaText

In the transfer step, we convert to a dictionary of arrays rather than a DataFrame. This is because DataFrames contribute as much as 2x peak memory in the constructor, which is beyond our control.

We time the following for the load step:

loader = paratext.internal_create_csv_loader(**params)

This completes all the reads for the benchmark and the contents of the CSV file are fully loaded into the memory on the C++ side.

DataBricks SparkCSV

sdf = sqlContext.read.format('com.databricks.spark.csv').option('header', str(bool(header)).lower()).option('inferschema', 'true').load(filename)

Dato SFrame

sf = sframe.SFrame.read_csv(params["filename"], header=header, column_type_hints=dtypes)

R read.csv

df = read.csv(filename)

R data.table fread

df = fread(filename)

R readr read_csv

df = read_csv(filename)

NumPy loadtxt

if params.get("header", True):
   skiprows = 1
else:
   skiprows = 0
X = np.loadtxt(params["filename"], skiprows=skiprows, delimiter=',')

Pandas read_csv

df = pandas.read_csv(params["filename"], header=header)

HDF5 via H5Py

f = h5py.File(params["filename"])
ds = f[params["dataset"]]
X = ds[:, :]

Feather

df = feather.read_dataframe(params["filename"])

NumPy NPY

X = np.load(params["filename"])

Pickle

fid = open(params["filename"])
df = pickle.load(fid)
fid.close()

Task #2: Summing the Columns

Some binary readers memory map of a file and lazily "read" it. Therefore, timing a load function is not always reliable indicator of a loader's performance on a cold disk. To ensure all the data is read, we sum the numeric columns and sum the lengths of the strings in the text columns.

DataBricks SparkCSV

def sum_spark_dataframe(df):
    from pyspark.sql import functions as F
    str_columns = [field.name for field in df.schema.fields if str(field.dataType)=="StringType"]
    num_columns = [field.name for field in df.schema.fields if str(field.dataType)!="StringType"]
    str_length_sums = [F.sum(F.length(df[k])) for k in str_columns]
    num_sums = [F.sum(df[k]) for k in num_columns]
    sums = str_length_sums + num_sums
    s = df.agg(*sums).collect()

Dato SFrame

def sum_sframe(df):
    s = {}
    for key in df.column_names():
        if df[key].dtype() in (str, unicode):
            s[key] = df[key].apply(lambda x: len(x)).sum()
        else:
            s[key] = df[key].sum()
    return s

Pandas DataFrame

In Pandas, a column can mix numbers and strings. This breaks df[key].sum(). There is no easy way to check if a column has a fixed type so we use try, except statements.

def sum_dataframe(df):
    s = {}
    for key in df.keys():
        if df[key].dtype in (str, unicode, object):
            try:
                s[key] = df[key].apply(lambda x: len(x)).sum()
            except:
                try:
                    s[key] = df[key].values.sum()
                except:
                    s[key] = np.nan
        else:
            s[key] = df[key].values.sum()
    return s

We found df[key].sum() is pretty slow compared to df[key].values.sum().

Dictionary of Arrays (DictFrame)

def sum_dictframe(d, levels):
    s = {}
    for key in d.keys():
        if key in levels.keys():
            level_sums = np.array([len(el) for el in levels[key]])
            s[key] = level_sums[d[key]].sum()
        else:
            s[key] = d[key].sum()
    return s

NumPy Arrays

def sum_ndarray(X):
    s = {}
    rows, cols = X.shape
    for col in xrange(0, cols):
        if type(X[0, col]) in (str, unicode):
            s[col] = pandas.Series(X[:,col]).apply(len).sum()
        else:
            s[col] = X[:,col].sum()
    return s

R DataFrames

s <- colSums(Filter(is.numeric, df))
s2 <- apply(Filter(function(x){!is.numeric(x)}, df), 2, function(x){sum(nchar(x))})

Task #3: Copy to Python

The third task tests the interactive data science experience. We measure the latency and memory footprint incurred when converting a data frame to a Python data structure.

Wise ParaText

transfer = paratext.internal_csv_loader_transfer(loader, forget=True)
dict_frame = {}
levels = {}
for column_name, column_data, column_semantics, column_levels in transfer:
   if column_semantics == 'cat':
       levels[column_name] = column_levels
       dict_frame[column_name] = column_data

DataBricks SparkCSV

pdf = sdf.toPandas()

Dato SFrame

sf.to_dataframe()

results matching ""

    No results matching ""