lab.data.extraction.sql
Lab - Data - Extraction - SQL¤
load_query(sql_file)
¤
load_query loads a sql query file as strings. load_query is best combined with query_data to retrieve data from database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sql_file |
str |
path to sql file |
required |
Returns:
Type | Description |
---|---|
str |
string representation of the sql query |
Source code in dietbox/lab/data/extraction/sql.py
def load_query(sql_file):
"""
load_query loads a sql query file as strings.
load_query is best combined with query_data to retrieve data from database.
:param sql_file: path to sql file
:type sql_file: str
:return: string representation of the sql query
:rtype: str
"""
if not os.path.exists(sql_file):
raise Exception(f"SQL file {sql_file} does not exist!")
with open(sql_file) as fp:
sql = fp.read()
if not sql:
logger.error(f"No content from sql file: {sql_file}")
return sql
query_data(queries, config)
¤
query_data queries data from MySQL remote and save the data in a dataframe.
Input queries should be a list of dictionaries with name
and query
as keys.
The key name
is only used for logs. It is strongly advised to include it.
.. code-block:: python
queries = [{ "name": query_name, "query": query_content }]
Here is an example of the config param.
.. code-block:: python
config = { "sql_hostname": os.getenv("PLATFORM_SQL_URI"), # 'sql_hostname' "sql_username": os.getenv("PLATFORM_SQL_USERNAME"), # 'sql_username' "sql_password": os.getenv("PLATFORM_SQL_PWD"), # 'sql_password' "sql_main_database": 'db_name', # 'db_name' "sql_port": 3306, "ssh_host": 'a.ssh.host.great_host.com', #'ssh_hostname' "ssh_user": 'my_ssh_username', #'ssh_username' "ssh_port": 22, "ssh_key": ssh_key # }
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str |
SQL queries arranged in a list of dictionaries with . |
required |
Returns:
Type | Description |
---|---|
Union[pandas.c,e.frame.DataFrame] |
dataframe of the returned data from the query |
Source code in dietbox/lab/data/extraction/sql.py
def query_data(queries, config):
"""
query_data queries data from MySQL remote and save the data in a dataframe.
Input queries should be a list of dictionaries with `name` and `query` as keys.
The key `name` is only used for logs. It is strongly advised to include it.
.. code-block:: python
queries = [{
"name": query_name,
"query": query_content
}]
Here is an example of the config param.
.. code-block:: python
config = {
"sql_hostname": os.getenv("PLATFORM_SQL_URI"), # 'sql_hostname'
"sql_username": os.getenv("PLATFORM_SQL_USERNAME"), # 'sql_username'
"sql_password": os.getenv("PLATFORM_SQL_PWD"), # 'sql_password'
"sql_main_database": 'db_name', # 'db_name'
"sql_port": 3306,
"ssh_host": 'a.ssh.host.great_host.com', #'ssh_hostname'
"ssh_user": 'my_ssh_username', #'ssh_username'
"ssh_port": 22,
"ssh_key": ssh_key #
}
:param query: SQL queries arranged in a list of dictionaries with .
:type query: str
:return: dataframe of the returned data from the query
:rtype: pandas.core.frame.DataFrame
"""
sql_hostname = config.get("sql_hostname") # 'sql_hostname'
sql_username = config.get("sql_username") # 'sql_username'
sql_password = config.get("sql_password") # 'sql_password'
sql_main_database = config.get("sql_main_database") # 'db_name'
sql_port = config.get("sql_port")
ssh_host = config.get("ssh_host") #'ssh_hostname'
ssh_user = config.get("ssh_user") #'ssh_username'
ssh_port = config.get("ssh_port")
ssh_key = config.get("ssh_key")
if ssh_key is None:
logger.warning("config does not include ssh_key\nUsing default in .ssh/id_rsa")
home = os.path.expanduser("~")
pkeyfilepath = os.path.join(".ssh", "id_rsa")
ssh_key = paramiko.RSAKey.from_private_key_file(
os.path.join(home, pkeyfilepath)
)
if isinstance(ssh_key, str):
logger.warning(
f"input config has ssh_key as strings: {ssh_key}\nConverting to key"
)
ssh_key = paramiko.RSAKey.from_private_key_file(ssh_key)
logger.info("Connecting to Server ... ")
with SSHTunnelForwarder(
(ssh_host, ssh_port),
ssh_username=ssh_user,
ssh_pkey=ssh_key,
remote_bind_address=(sql_hostname, sql_port),
) as tunnel:
sleep(3)
conn = pymysql.connect(
host="127.0.0.1",
user=sql_username,
passwd=sql_password,
db=sql_main_database,
port=tunnel.local_bind_port,
)
logger.info("Connected to the MySQL server")
sleep(1)
datasets = []
for query in queries:
logger.debug(query)
query_query = query.get("query")
logger.info(f"Running query for {query.get('name')}")
data = pd.read_sql_query(query_query, conn)
query["data"] = data.copy()
datasets.append(query)
sleep(10)
conn.close()
return datasets