D. Metrics
We now give an overview of how each performance characteristic was measured.
Memory Footprint
Failure due to underprovisioned resources is deadly in production data science. We aim for predictable memory usage so we can prevent crashes of this kind in most cases. There are many ways to measure memory usage. Total bytes consumed by calls to malloc()
or bytes occupied by Python objects is irrelevant to the OS. Peak memory usage gives more operational insight.
We use the following two functions to profile memory for each benchmark. They were adapted from Fabian Pedregosa's excellent blog post on measuring memory usage in Python:
def memory_usage_psutil(process_name):
# return the memory usage in MB
import psutil
mem = 0
pids = get_pids(process_name)
for pid in pids:
process = psutil.Process(pid)
pmem = process.memory_info()[0] / float(2 ** 20)
mem += pmem
return mem
def memory_usage_resource():
import resource
rusage_denom = 1024.
if sys.platform == 'darwin':
rusage_denom = rusage_denom * rusage_denom
mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / rusage_denom
return mem
The first function, memory_usage_psutil
finds all instances of a process of a given command name and sums the peak RSS. The second function, memory_usage_resource
returns the peak RSS for the current python process.
All instances of java
belonged to Spark during the Spark benchmarks. The memory footprint of Spark includes both the local Python process of the PySpark master and java.
total_pyspark_mem_used = memory_usage_resource() + memory_usage_psutil("java")
This returns the total memory used in megabytes.
In R, we must call ps
on the calling process's pid
to measure memory usage in pages. We then convert to bytes:
memory_usage <- function(){
return(strtoi(system(paste("ps -o rss ", Sys.getpid(), "| tail -1"), intern=TRUE))*1024)
}
Filesystem and Disk Caches
The Linux OS is very aggressive about caching previous reads. The filesystem cache persists even when a process dies. IPython's timeit
magic function is tempting:
%timeit pandas.read_csv("filename.csv")
but the numbers are not very insightful for cold read performance. There may be no significant difference in performance between drastically different storage systems after the first read. The plot in the Machine Setup section clearly shows RAID-0 (2 ephemeral SSDs) and EBS have the same throughput on the second and subsequent reads. However, when the cache is cleared, RAID-0 (2 SSDs)'s throughput is 6x of EBS. The throughput of RAID-0 with 8 instance SSDs is 25x of EBS. Again, these differences may vanish on the second read.
Clearing the Filesystem Cache
The following bash
commands are used to ensure
All writes must be synced and the caches must be flushed before running any benchmark. The clear_cache.sh
bash script can be used for this purpose.
clear_cache.sh
#!/usr/bin/env bash
#
# Ensure all writes are synced to disk.
sudo bash -c "sync || sync || sync || sync"
# Clear the caches
sudo bash -c "echo 3 > /proc/sys/vm/drop_caches"
The ./clear_cache.sh
command is executed in between cold benchmarks and before the first trial for warm benchmarks. In the warm case, first trial is always discarded and additional trial is performed.
Writes faster than reads: not so fast!
These benchmarks do not address the performance of file writers, but timing writes is worthy of some discussion. The IPython %time
magic function is tempting to time how long it takes to write a file:
%time my_write_function("myfile")
However, it is also unreliable because the writes may still be cached in memory after the function returns. Moreover, timing a read on the file that was just written also gives a bad estimate of cold read performance. Some of the file contents may actually be read from memory. It is also a bad estimate of warm read performance because the OS may be flushing the write cache at the same time it reads from the file.
In the example below, it seemingly took 20 seconds to copy 45 GB of files to a general-purpose EBS volume.
$ sync
$ time cp /raid0-2/*.csv /public
real 0m20.492s
user 0m0.001s
sys 0m0.002s
A write throughput of 2.25 GB/s well exceeds the bandwidth of 250 MB/s. The sync
command ensures all cached writes are flushed disk.
$ time sync
real 3m50.049s
user 0m0.001s
sys 0m0.003s
Unsurprisingly, it takes about 3 minutes and 50 seconds to complete. A throughput estimate of 195 MB/s is more reasonable. However, some writes may be untimed. Timing write
and sync
together gives a more reliable estimate:
os.system("sync")
tic = time()
my_write_function("mytime")
os.system("sync")
toc = time()
Tasks
We tested each method on three different tasks:
- load: loading a CSV file or binary file.
- sum: summing the data loaded. For text columns, we take the sum of the columns.
- copy: copy the loaded data from Spark/Dato/ParaText's internal representation to Python.
Task #1: Loading the Data
The code used to load a CSV or binary file for each method is shown in this section. In our benchmarks, the loading step is timed separately from the transfer step so we can consider their relative overheads independently.
Wise.io ParaText
In the transfer step, we convert to a dictionary of arrays rather than a DataFrame. This is because DataFrames contribute as much as 2x peak memory in the constructor, which is beyond our control.
We time the following for the load step:
loader = paratext.internal_create_csv_loader(**params)
This completes all the reads for the benchmark and the contents of the CSV file are fully loaded into the memory on the C++ side.
DataBricks SparkCSV
sdf = sqlContext.read.format('com.databricks.spark.csv').option('header', str(bool(header)).lower()).option('inferschema', 'true').load(filename)
Dato SFrame
sf = sframe.SFrame.read_csv(params["filename"], header=header, column_type_hints=dtypes)
R read.csv
df = read.csv(filename)
R data.table fread
df = fread(filename)
R readr read_csv
df = read_csv(filename)
NumPy loadtxt
if params.get("header", True):
skiprows = 1
else:
skiprows = 0
X = np.loadtxt(params["filename"], skiprows=skiprows, delimiter=',')
Pandas read_csv
df = pandas.read_csv(params["filename"], header=header)
HDF5 via H5Py
f = h5py.File(params["filename"])
ds = f[params["dataset"]]
X = ds[:, :]
Feather
df = feather.read_dataframe(params["filename"])
NumPy NPY
X = np.load(params["filename"])
Pickle
fid = open(params["filename"])
df = pickle.load(fid)
fid.close()
Task #2: Summing the Columns
Some binary readers memory map of a file and lazily "read" it. Therefore, timing a load function is not always reliable indicator of a loader's performance on a cold disk. To ensure all the data is read, we sum the numeric columns and sum the lengths of the strings in the text columns.
DataBricks SparkCSV
def sum_spark_dataframe(df):
from pyspark.sql import functions as F
str_columns = [field.name for field in df.schema.fields if str(field.dataType)=="StringType"]
num_columns = [field.name for field in df.schema.fields if str(field.dataType)!="StringType"]
str_length_sums = [F.sum(F.length(df[k])) for k in str_columns]
num_sums = [F.sum(df[k]) for k in num_columns]
sums = str_length_sums + num_sums
s = df.agg(*sums).collect()
Dato SFrame
def sum_sframe(df):
s = {}
for key in df.column_names():
if df[key].dtype() in (str, unicode):
s[key] = df[key].apply(lambda x: len(x)).sum()
else:
s[key] = df[key].sum()
return s
Pandas DataFrame
In Pandas, a column can mix numbers and strings. This breaks df[key].sum()
. There is no easy way to check if a column has a fixed type so we use try, except statements.
def sum_dataframe(df):
s = {}
for key in df.keys():
if df[key].dtype in (str, unicode, object):
try:
s[key] = df[key].apply(lambda x: len(x)).sum()
except:
try:
s[key] = df[key].values.sum()
except:
s[key] = np.nan
else:
s[key] = df[key].values.sum()
return s
We found df[key].sum()
is pretty slow compared to df[key].values.sum()
.
Dictionary of Arrays (DictFrame)
def sum_dictframe(d, levels):
s = {}
for key in d.keys():
if key in levels.keys():
level_sums = np.array([len(el) for el in levels[key]])
s[key] = level_sums[d[key]].sum()
else:
s[key] = d[key].sum()
return s
NumPy Arrays
def sum_ndarray(X):
s = {}
rows, cols = X.shape
for col in xrange(0, cols):
if type(X[0, col]) in (str, unicode):
s[col] = pandas.Series(X[:,col]).apply(len).sum()
else:
s[col] = X[:,col].sum()
return s
R DataFrames
s <- colSums(Filter(is.numeric, df))
s2 <- apply(Filter(function(x){!is.numeric(x)}, df), 2, function(x){sum(nchar(x))})
Task #3: Copy to Python
The third task tests the interactive data science experience. We measure the latency and memory footprint incurred when converting a data frame to a Python data structure.
Wise ParaText
transfer = paratext.internal_csv_loader_transfer(loader, forget=True)
dict_frame = {}
levels = {}
for column_name, column_data, column_semantics, column_levels in transfer:
if column_semantics == 'cat':
levels[column_name] = column_levels
dict_frame[column_name] = column_data
DataBricks SparkCSV
pdf = sdf.toPandas()
Dato SFrame
sf.to_dataframe()