"""
Pandas Save/Load Utils and Patterns
"""
[docs]__author__ = "Elisha Yadgaran"
from io import StringIO
from os import makedirs
from os.path import dirname, isfile, join
from typing import Any, Dict, Optional
import pandas as pd
from simpleml.registries import FILEPATH_REGISTRY
from simpleml.save_patterns.base import BaseSerializer
from simpleml.utils.configuration import (
CSV_DIRECTORY,
JSON_DIRECTORY,
PARQUET_DIRECTORY,
)
[docs]class PandasPersistenceMethods(object):
"""
Base class for internal Pandas serialization/deserialization options
Wraps pd.Dataframe methods with sensible defaults
https://pandas.pydata.org/docs/reference/io.html
https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html
"""
[docs] INDEX_COLUMN = "simpleml_index"
@classmethod
[docs] def read_csv(cls, filename: str, **kwargs) -> pd.DataFrame:
"""Helper method to read in a csv file"""
df = pd.read_csv(filename, **kwargs)
if cls.INDEX_COLUMN in df.columns:
df = df.set_index(cls.INDEX_COLUMN)
return df
@staticmethod
[docs] def read_parquet(filepath: str, **kwargs) -> pd.DataFrame:
return pd.read_parquet(filepath, **kwargs)
@staticmethod
[docs] def read_hdf(filepath: str, **kwargs) -> pd.DataFrame:
return pd.read_hdf(filepath, **kwargs)
@staticmethod
[docs] def read_orc(filepath: str, **kwargs) -> pd.DataFrame:
return pd.read_orc(filepath, **kwargs)
@classmethod
[docs] def read_json(
cls, filepath: str, orient: str = "records", lines: bool = True, **kwargs
) -> pd.DataFrame:
# Automatically handle index
df = pd.read_json(filepath, orient=orient, lines=lines, **kwargs)
if cls.INDEX_COLUMN in df.columns:
df = df.set_index(cls.INDEX_COLUMN)
return df
@staticmethod
[docs] def read_fwf(**kwargs) -> pd.DataFrame:
return pd.read_fwf(**kwargs)
@staticmethod
[docs] def read_html(**kwargs) -> pd.DataFrame:
return pd.read_html(**kwargs)
@staticmethod
[docs] def read_xml(**kwargs) -> pd.DataFrame:
return pd.read_xml(**kwargs)
@staticmethod
[docs] def read_clipboard(**kwargs) -> pd.DataFrame:
return pd.read_clipboard(**kwargs)
@staticmethod
[docs] def read_excel(**kwargs) -> pd.DataFrame:
return pd.read_excel(**kwargs)
@staticmethod
[docs] def read_feather(**kwargs) -> pd.DataFrame:
return pd.read_feather(**kwargs)
@staticmethod
[docs] def read_stata(**kwargs) -> pd.DataFrame:
return pd.read_stata(**kwargs)
@staticmethod
[docs] def read_sas(**kwargs) -> pd.DataFrame:
return pd.read_sas(**kwargs)
@staticmethod
[docs] def read_spss(**kwargs) -> pd.DataFrame:
return pd.read_spss(**kwargs)
@staticmethod
[docs] def read_pickle(**kwargs) -> pd.DataFrame:
return pd.read_pickle(**kwargs)
@staticmethod
[docs] def read_sql(**kwargs) -> pd.DataFrame:
return pd.read_sql(**kwargs)
@staticmethod
[docs] def read_bigquery(**kwargs) -> pd.DataFrame:
return pd.read_gbq(**kwargs)
@staticmethod
[docs] def read_sql_table(**kwargs) -> pd.DataFrame:
return pd.read_sql_table(**kwargs)
@staticmethod
[docs] def read_table(**kwargs) -> pd.DataFrame:
return pd.read_table(**kwargs)
@staticmethod
[docs] def read_sql_query(query: str, connection, **kwargs) -> pd.DataFrame:
"""Helper method to read in sql data"""
return pd.read_sql_query(query, connection, **kwargs)
@staticmethod
[docs] def df_to_sql(
engine,
df: pd.DataFrame,
table: str,
dtype: Optional[Dict[str, str]] = None,
schema: str = "public",
if_exists: str = "replace",
sep: str = "|",
encoding: str = "utf8",
index: bool = False,
) -> None:
"""
Utility to bulk insert pandas dataframe via `copy from`
:param df: dataframe to insert
:param table: destination table
:param dtype: column schema of destination table
:param schema: destination schema
:param if_exists: what to do if destination table exists; valid inputs are:
[`replace`, `append`, `fail`]
:param sep: separator key between cells
:param encoding: character encoding to use
:param index: whether to output index with data
"""
NULL_STRING = "SIMPLEML_NULL"
# Create Table
df.head(0).to_sql(
table,
con=engine,
if_exists=if_exists,
index=index,
schema=schema,
dtype=dtype,
)
# Prepare data
output = StringIO()
df.to_csv(
output,
sep=sep,
header=False,
encoding=encoding,
index=index,
na_rep=NULL_STRING,
)
output.seek(0)
# Insert data
connection = engine.raw_connection()
cursor = connection.cursor()
# Use copy expert for CSV formatting (handles character escapes, copy_from does not)
cursor.copy_expert(
"""COPY "{schema}"."{table}" ({columns}) FROM STDIN WITH (FORMAT CSV, NULL '{null}', DELIMITER '{sep}')""".format(
schema=schema,
table=table,
columns=", ".join(['"{}"'.format(i) for i in df.columns]),
null=NULL_STRING,
sep=sep,
),
output,
)
connection.commit()
connection.close()
@staticmethod
[docs] def to_pickle(
df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_pickle(filepath, **kwargs)
@classmethod
[docs] def to_csv(
cls, df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_csv(filepath, index_label=cls.INDEX_COLUMN, **kwargs)
@staticmethod
[docs] def to_clipboard(df: pd.DataFrame, overwrite: bool = True, **kwargs) -> None:
df.to_clipboard(**kwargs)
@staticmethod
[docs] def to_excel(
df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_excel(filepath, **kwargs)
@classmethod
[docs] def to_json(
cls,
df: pd.DataFrame,
filepath: str,
overwrite: bool = True,
lines: bool = True,
orient: str = "records",
**kwargs,
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
# json records do not include index so artificially inject
if cls.INDEX_COLUMN in df.columns:
df.to_json(filepath, orient=orient, lines=lines**kwargs)
else:
df.reset_index(drop=False).rename(
columns={"index": cls.INDEX_COLUMN}
).to_json(filepath, orient=orient, lines=lines, **kwargs)
@staticmethod
[docs] def to_html(
df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_html(filepath, **kwargs)
@staticmethod
[docs] def to_xml(
df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_xml(filepath, **kwargs)
@staticmethod
[docs] def to_latex(
df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_latex(filepath, **kwargs)
@staticmethod
[docs] def to_feather(
df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_feather(filepath, **kwargs)
@staticmethod
[docs] def to_parquet(
df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_parquet(filepath, **kwargs)
@staticmethod
[docs] def to_stata(
df: pd.DataFrame, filepath: str, overwrite: bool = True, **kwargs
) -> None:
# make sure the directory exists
makedirs(dirname(filepath), exist_ok=True)
if not overwrite:
# Check if file was already serialized
if isfile(filepath):
return
df.to_stata(filepath, **kwargs)
[docs]class PandasParquetSerializer(BaseSerializer):
@staticmethod
[docs] def serialize(
obj: pd.DataFrame,
filepath: str,
format_directory: str = PARQUET_DIRECTORY,
format_extension: str = ".parquet",
destination_directory: str = "system_temp",
**kwargs,
) -> Dict[str, str]:
# Append the filepath to the storage directory
filepath = join(format_directory, filepath + format_extension)
full_path = join(FILEPATH_REGISTRY.get(destination_directory), filepath)
PandasPersistenceMethods.to_parquet(obj, full_path)
return {"filepath": filepath, "source_directory": destination_directory}
@staticmethod
[docs] def deserialize(
filepath: str, source_directory: str = "system_temp", **kwargs
) -> Dict[str, pd.DataFrame]:
full_path = join(FILEPATH_REGISTRY.get(source_directory), filepath)
return {"obj": PandasPersistenceMethods.read_parquet(full_path)}
[docs]class PandasCSVSerializer(BaseSerializer):
@staticmethod
[docs] def serialize(
obj: pd.DataFrame,
filepath: str,
format_directory: str = CSV_DIRECTORY,
format_extension: str = ".csv",
destination_directory: str = "system_temp",
**kwargs,
) -> Dict[str, str]:
# Append the filepath to the storage directory
filepath = join(format_directory, filepath + format_extension)
full_path = join(FILEPATH_REGISTRY.get(destination_directory), filepath)
PandasPersistenceMethods.to_csv(obj, full_path)
return {"filepath": filepath, "source_directory": destination_directory}
@staticmethod
[docs] def deserialize(
filepath: str, source_directory: str = "system_temp", **kwargs
) -> Dict[str, pd.DataFrame]:
full_path = join(FILEPATH_REGISTRY.get(source_directory), filepath)
return {"obj": PandasPersistenceMethods.read_csv(full_path)}
[docs]class PandasJSONSerializer(BaseSerializer):
@staticmethod
[docs] def serialize(
obj: pd.DataFrame,
filepath: str,
format_directory: str = JSON_DIRECTORY,
format_extension: str = ".jsonl",
destination_directory: str = "system_temp",
**kwargs,
) -> Dict[str, str]:
# Append the filepath to the storage directory
filepath = join(format_directory, filepath + format_extension)
full_path = join(FILEPATH_REGISTRY.get(destination_directory), filepath)
PandasPersistenceMethods.to_json(obj, full_path)
return {"filepath": filepath, "source_directory": destination_directory}
@staticmethod
[docs] def deserialize(
filepath: str, source_directory: str = "system_temp", **kwargs
) -> Dict[str, pd.DataFrame]:
full_path = join(FILEPATH_REGISTRY.get(source_directory), filepath)
return {"obj": PandasPersistenceMethods.read_json(full_path)}