Skip to content

data.sync.local

Data - Sync - Local¤

LocalStorage ¤

A model for local storage

is_in_storage(self, record_identifier, record_identifier_lookup_paths) ¤

Check if the record is already in storage

Source code in dietbox/data/sync/local.py
def is_in_storage(self, record_identifier, record_identifier_lookup_paths):
    """Check if the record is already in storage"""
    if isinstance(record_identifier_lookup_paths, str):
        record_identifier_lookup_paths = [record_identifier_lookup_paths]

    if not isinstance(record_identifier, str):
        logger.warning("Input data is not string")
        try:
            record_identifier = str(record_identifier)
        except Exception as ee:
            logger.error(
                f"Could not convert input {record_identifier} to string! {ee}"
            )
            return {"exists": False, "record": None}

    record_identifier = record_identifier.lower()

    if not self.records:
        all_existing_records = self.load_records()
    all_existing_records = self.records

    for record in all_existing_records:
        for record_identifier_lookup_path in record_identifier_lookup_paths:
            record_company = _get_value_in_dict_recursively(
                record, record_identifier_lookup_path
            )
            if record_company:
                record_company = record_company.lower()
                if record_identifier == record_company:
                    return {"exists": True, "record": record}

    return {"exists": False, "record": None}

load_records(self, keep_in_memory=True) ¤

Load records for target

Source code in dietbox/data/sync/local.py
def load_records(self, keep_in_memory=True):
    """Load records for target"""

    all_records = load_records(self.target)
    if keep_in_memory:
        self.records = all_records

    return all_records

save_records(self, record) ¤

Save records in target

Source code in dietbox/data/sync/local.py
def save_records(self, record):
    """Save records in target"""
    company = record.get("company")

    if self.is_in_storage(company).get("exists"):
        logger.debug(f"{company} already exists! No need to save again!")
    save_records(record, self.target, is_flush=True)

cache_dataframe(dataframe, file, engine=None) ¤

Write dataframe to a line-delineated json file.

.. warning:: pandas engine doesn't respect the date encoder define since it has its own.

Parameters:

Name Type Description Default
dataframe

inut pandas dataframe

required
file str

path of file to be written to

required
engine

we have two engines to convert the data to json, pandas and json.

None

Returns:

Type Description
list

if the engine is json, the converted json records are returned for inspections

Source code in dietbox/data/sync/local.py
def cache_dataframe(dataframe, file, engine=None):
    """
    Write dataframe to a line-delineated json file.

    .. warning::
       pandas engine doesn't respect the date encoder define since it has its own.

    :param dataframe: inut pandas dataframe
    :param str file: path of file to be written to
    :param engine: we have two engines to convert the data to json, pandas and json.
    :return: if the engine is json, the converted json records are returned for inspections
    :rtype: list
    """

    if engine is None:
        engine = "json"

    if os.path.isfile(file):
        logger.error("File '{}' exists, overwriting...".format(file))

    if engine == "json":
        res = []
        with open(file, "w") as f:
            for _, row in dataframe.iterrows():
                row_dict = row.to_dict()
                logger.debug("cache_dataframe::", row_dict)
                res.append(row_dict)
                f.write(
                    json.dumps(row.to_dict(), default=isoencode, ignore_nan=True) + "\n"
                )
        return res
    elif engine == "pandas":
        dataframe.to_json(
            file,
            orient="records",
            lines=True,
            date_format="iso",
            default_handler=isoencode,
        )
    else:
        raise Exception(f"No engine defined for {engine}")

load_records(data_path_inp) ¤

Load data from a line deliminated json file. Instead of loading pandas for such a simple job, this function does the work in most cases.

Parameters:

Name Type Description Default
data_path_inp

data file path

required

Returns:

Type Description

list of dicts

Source code in dietbox/data/sync/local.py
def load_records(data_path_inp):
    """Load data from a line deliminated json file. Instead of loading pandas for such a simple job, this function does the work in most cases.

    :param data_path_inp: data file path
    :return: list of dicts
    """

    data = []

    with open(data_path_inp, "r") as fp:
        for line in fp:
            line = line.replace("null", ' "None" ')
            try:
                line_data = json.loads(line.strip())
            except Exception as ee:
                logger.warning("could not load ", line, "\n", ee)
            data.append(line_data)

    return data

save_records(data_inp, output, is_flush=None, write_mode=None) ¤

Save list of dicts to file. Instead of loading pandas for such a simple job, this function does the work in most cases.

:is_flush: whether to flush data to file for each row written to file

Parameters:

Name Type Description Default
data_inp

dict or list of dict to be saved

required
output

path to output file

required

Returns:

Type Description

None

Source code in dietbox/data/sync/local.py
def save_records(data_inp, output, is_flush=None, write_mode=None):
    """Save list of dicts to file. Instead of loading pandas for such a simple job, this function does the work in most cases.

    :param data_inp: dict or list of dict to be saved
    :param output: path to output file
    :is_flush: whether to flush data to file for each row written to file
    :return: None
    """

    if write_mode is None:
        write_mode = "a+"

    if is_flush is None:
        is_flush = False

    if isinstance(data_inp, list):
        data = data_inp
    elif isinstance(data_inp, dict):
        data = [data_inp]
    else:
        raise Exception("Input data is neither list nor dict: {}".format(data_inp))

    try:
        with open(output, write_mode) as fp:
            for i in data:
                json.dump(i, fp)
                fp.write("\n")
                if is_flush:
                    fp.flush()
    except Exception as ee:
        raise Exception("Could not load data to file: {}".format(ee))