Source code for ibmcloudsql.catalog_table
"""Catalog Table."""
import logging
import time
from deprecated import deprecated
logger = logging.getLogger(__name__)
try:
from cos import ParsedUrl
from exceptions import (
SqlQueryDropTableException,
SqlQueryFailException,
SqlQueryCreateTableException,
)
except ImportError:
from .cos import ParsedUrl
from .exceptions import (
SqlQueryDropTableException,
SqlQueryFailException,
SqlQueryCreateTableException,
)
[docs]class HiveMetastore:
"""This class supports the handling HIVE catalog table."""
def __init__(self, target_url):
"""Create an instance of the class.
Parameters
----------
target_url: str
The COS URL that is used to store temporary data for any SQL Query queries.
"""
self.current_table_name = None
# keep tracks of what tables are availables
self.partitioned_tables = set()
self.regular_tables = set()
self.sql_stmt_show_template = """
SHOW TABLES {like}
INTO {cos_out} STORED AS CSV
"""
# The default URL where data should be queried
self.cos_in_url_partitioned = "cos://us-geo/sql/customers_partitioned.csv"
self.cos_in_url = "cos://us-geo/sql/customers.csv"
self.sql_stmt_create_template = """
CREATE TABLE {table_name}
USING {format_type}
LOCATION {cos_in}
"""
self._target_url = target_url
self.supported_format_types = ["PARQUET", "CSV", "JSON"]
def _is_valid_target_url(self, target_url=None):
"""Raise ValueError if the required COS URL is invalid."""
if target_url is None:
target_url = self._target_url
if target_url is None or not ParsedUrl().is_valid_cos_url(target_url):
msg = "Need to define target COS URL"
if target_url is not None:
msg = "Not a valid COS URL: {}".format(target_url)
raise ValueError(msg)
return True
[docs] def configure(self, target_url):
"""Update the configuration."""
self._is_valid_target_url(target_url)
self._target_url = target_url
@property
def target_url(self):
"""Return the target COS URL."""
return self._target_url
@target_url.setter
def target_url(self, target_url):
self._is_valid_target_url(target_url)
self._target_url = target_url
# Extended functionality
[docs] def show_tables(self, target_cos_url=None, pattern=None):
"""List the available Hive Metastore.
Parameters
------------
target_cos_url: string, optional
The COR URL where the information about the tables are stored
pattern: str, optional
If provided, this should be a pattern being used in name matching,
e.g. '*cus*', which finds all tables with the name has 'cus'
Returns
--------
DataFrame or None
return `None` if there is an error.
Raises
-------
SqlQueryFailException
ValueError
"""
if target_cos_url is None:
cos_out = self.target_url
else:
cos_out = target_cos_url
self._is_valid_target_url(cos_out)
sql_stmt_show = self.sql_stmt_show_template.format(
like="LIKE '{}'".format(pattern) if pattern else "", cos_out=cos_out
)
df = None
try:
df, job_id = self.execute_sql(sql_stmt_show, get_result=True)
except (SqlQueryFailException, KeyError) as e:
logger.info("Fail at SHOW TABLE")
raise e
return df
[docs] def drop_all_tables(self):
"""Delete all created tables in the project.
Returns
-------
bool
True [if success]
"""
df = self.show_tables()
if df is not None and not df.empty:
for (_, table_name) in df["tableName"].iteritems():
self.drop_table(table_name)
try:
self.partitioned_tables.remove(table_name)
self.regular_tables.remove(table_name)
except KeyError:
pass
self.partitioned_tables = set()
self.regular_tables = set()
return True
[docs] def drop_tables(self, table_names):
"""Drop a list of tables.
Parameters
----------
table_names: list
A list of tables
"""
for x in table_names:
self.drop_table(x)
[docs] def drop_table(self, table_name=None):
"""Drop a given table.
Parameters
----------
table_name: str, optional
The name of the table
If skipped, the being tracked catalog-table is used
Returns
-------
str:
a job status
Raises
-------
SqlQueryDropTableException
when it cannot remove the given table name
ValueError
"""
if table_name is None and self.current_table_name is None:
msg = "please provide table_name"
raise ValueError(msg)
if table_name is None:
table_name = self.current_table_name
self.current_table_name = None
sql_stmt_drop = """
DROP TABLE {table_name}""".format(
table_name=table_name
)
try:
_, job_id = self.execute_sql(sql_stmt_drop)
logger.debug(
"Job_id ({stmt}): {job_id}".format(stmt=sql_stmt_drop, job_id=job_id)
)
job_status = self.wait_for_job(job_id)
if job_status != "completed":
raise SqlQueryDropTableException(
"table name {} removed failed".format(table_name)
)
self.partitioned_tables.remove(table_name)
self.regular_tables.remove(table_name)
except SqlQueryFailException:
msg = "Wrong table name"
logger.warning(msg)
except KeyError:
pass
return "completed"
[docs] def create_table(
self,
table_name,
cos_url=None,
format_type="CSV",
force_recreate=False,
blocking=True,
schema=None,
):
"""Create a table for data on COS.
Parameters
----------
table_name: str
The name of the table
cos_url : str, optional
The COS URL from which the table should reference to
If not provided, it uses the internal `self.cos_in_url`
format_type: string, optional
The type of the data above that you want to reference (default: CSV)
force_recreate: bool, optional
(True) force to recreate an existing table
blocking: bool, optional
(True) wait until it returns the resut
schema: None or string
If None, then automatic schema detection is used. Otherwise, pass in the comma-separated
string in the form "(columnName type, columnName type)"
Returns
-------
none if job "failed"
otherwise returns
Raises
-------
ValueError:
when the argument is invalid for `cos_url` (invalid format or there is no such location),
format_type (invalid value)
SqlQueryDropTableException
when it cannot remove the given table name
SqlQueryCreateTableException
when it cannot create the given table name
"""
self._is_valid_target_url(cos_url)
def _create_table_async(
table_name,
cos_url=None,
format_type="CSV",
force_recreate=False,
schema=None,
):
"""
Create a table asynchronously. This is the async version of :meth:`.create_table`.
Parameters
----------
table_name: str
The name of the table
cos_url : str, optional
The COS URL from which the table should reference to
If not provided, it uses the internal `self.cos_in_url`
force_recreate: bool, optional
(True) force to recreate an existing table
Returns
------
job_id [if the table is being created]
None [if the table already created]
"""
self.current_table_name = table_name
if cos_url is None:
cos_url = self.cos_in_url
# from IPython.display import display
df = self.show_tables()
try:
found = df[df["tableName"].str.contains(table_name.strip().lower())]
except Exception:
# not found
found = []
# if logger.getEffectiveLevel() == logging.DEBUG:
# display(df)
if len(found) > 0 and force_recreate:
self.drop_table(table_name)
self.regular_tables.add(table_name)
if len(found) == 0 or force_recreate:
if schema is None:
sql_stmt_create = self.sql_stmt_create_template.format(
table_name=table_name, cos_in=cos_url, format_type=format_type
)
else:
# choose scheme -> need to tell "PARTITIONED BY"
schema = self._format_schema(schema)
sql_stmt_create = """
CREATE TABLE {table_name} {schema}
USING {format_type}
LOCATION {cos_in}
""".format(
table_name=table_name,
cos_in=cos_url,
format_type=format_type,
schema=schema,
)
logger.debug(sql_stmt_create)
job_id = self.submit_sql(sql_stmt_create)
return job_id
return None
job_id = _create_table_async(
table_name,
cos_url,
format_type=format_type,
force_recreate=force_recreate,
schema=schema,
)
if job_id is not None and blocking is True:
job_status = self.wait_for_job(job_id)
if job_status == "completed":
return self.get_result(job_id)
else:
return None
return None
def _format_schema(self, schema):
"""Format the schema string to ensure it is enclosed by ( and )."""
schema = schema.strip()
if schema[0] == "(" or schema[-1] == ")":
if schema[0] != "(" or schema[-1] != ")":
print("schema wrong format, should be: (name type, name type)")
assert 0
else:
schema = "(" + schema + ")"
return schema
[docs] def create_partitioned_table(
self,
table_name,
cos_url=None,
format_type="CSV",
force_recreate=False,
schema=None,
partition_list=None,
):
"""Create a partitioned table for data on COS.
Note
-----------
The data needs to be organized in the form that
match HIVE metastore criteria, e.g.
.. code-block:: console
<COS-URL>/field_1=value1_1/field_2=value_2_1/object_file
<COS-URL>/field_1=value1_2/field_2=value_2_1/object_file
NOTE: Each time the data is updated, we need to call
:meth:`.recover_table_partitions`
on the created partitioned table.
Parameters
--------------
table_name: str
the name of the table to be created
cos_url : str, optional
The COS URL from which the table should reference to
If not provided, it uses the internal `self.cos_in_url`
format_type: string, optional
The type of the data above (default: CSV)
force_recreate: bool
(True) force to recreate an existing table
schema: None or string
If None, then automatic schema detection is used. Otherwise,
pass in the comma-separated
string in the form "(columnName type, columnName type)"
partition_list: comma-separated string | list
the list of columns to be part of the partitioning. NOTE: the order
matters.
Returns
----------
None
Raises
-------
ValueError:
when the argument is invalid for `cos_url`
(invalid format or there is no such location),
format_type (invalid value)
SqlQueryFailException:
when it cannot remove the given table name
"""
self.current_table_name = table_name
if cos_url is None:
cos_url = self.cos_in_url_partitioned
else:
self._is_valid_target_url(cos_url)
if schema is not None:
if partition_list is None:
raise ValueError("Please provide `partition_list`")
df = self.show_tables()
table = table_name.strip().lower()
try:
found = df[df["tableName"].str.contains(table)][
"tableName"
].tolist() # noqa
if table not in found:
found = []
except Exception:
# not found
found = []
if len(found) > 0 and force_recreate:
self.drop_table(table_name)
self.partitioned_tables.add(table_name)
if format_type.upper() not in self.supported_format_types:
raise ValueError(
"Please fix `format_type` to be in {}".format(
str(self.supported_format_types)
)
)
if len(found) == 0 or force_recreate:
if schema is None:
# auto-detection of scheme
self.sql_stmt_create_partitioned_template = """
CREATE TABLE {table_name}
USING {format_type}
LOCATION {cos_in}
"""
sql_stmt_create_partitioned = self.sql_stmt_create_partitioned_template.format( # noqa
table_name=table_name,
cos_in=cos_url,
format_type=format_type, # noqa
)
else:
if isinstance(partition_list, list):
tmp = ", ".join(partition_list)
partition_list = tmp
# explit selection of scheme -> need to tell "PARTITIONED BY"
schema = self._format_schema(schema)
sql_stmt_create_partitioned = """
CREATE TABLE {table_name} {schema}
USING {format_type}
PARTITIONED BY ({partition_list})
LOCATION {cos_in}
""".format(
table_name=table_name,
cos_in=cos_url,
format_type=format_type,
schema=schema,
partition_list=partition_list,
)
logger.debug(sql_stmt_create_partitioned)
try:
self.run_sql(sql_stmt_create_partitioned)
except Exception as e:
msg = str(e)
no_schema_error_msg = "Unable to infer schema"
if no_schema_error_msg in msg:
msg = (
"Can't infer schema (explicit schema is needed)"
" or the COS URL is wrong. Please check"
)
raise SqlQueryCreateTableException(msg)
else:
raise e
time.sleep(2)
self.recover_table_partitions(table_name)
else:
# TODO: update table?
pass
[docs] def add_partitions(self, table_name, col_names):
"""Update the table with a partition column having new value."""
self.current_table_name = table_name
sql_stmt = """ALTER TABLE {table_name} ADD {col_names} PARTITIONS
""".format(
table_name=table_name, col_names=col_names
)
self.run_sql(sql_stmt)
[docs] def recover_table_partitions(self, table_name):
"""Update a partitioned table. This step is required after creating a new partitioned table.
Note
------
You should use this once at the start (at the first time you define
the table) to save some work but later on, I would recommend using
:meth:`.add_partitions`
Parameters
----------
table_name: str
The partitioned table name
"""
self.current_table_name = table_name
sql_stmt = """ALTER TABLE {table_name} RECOVER PARTITIONS
""".format(
table_name=table_name
)
self.run_sql(sql_stmt)
[docs] def get_schema_table(self, table_name):
"""Return the schema of table.
Parameters
----------
table_name: str
Name of the HIVE catalog table
Returns
------
DataFrame or None [if failed - table not found]
3 columns: col_name (object), data_type (object), comment (float64)
"""
return self._describe_table(table_name)
[docs] @deprecated(
version="1.0",
reason="Please use get_schema_table [in par with :meth:`.get_schema` from COS URL]", # noqa
)
def describe_table(self, table_name):
"""Return the schema of a table."""
return self._describe_table(table_name)
def _describe_table(self, table_name):
self._is_valid_target_url()
sql_stmt = """
DESCRIBE TABLE {table_name} INTO {cos_out} STORED AS CSV""".format(
table_name=table_name, cos_out=self.target_url
)
try:
return self.run_sql(sql_stmt)
except Exception:
return None
[docs] def create_metaindex(self, table_name):
"""Create a metaindex for a given table."""
sql_stmt = """CREATE METAINDEX {}""".format(table_name)
try:
return self.run_sql(sql_stmt)
except Exception:
return None
[docs] def get_metaindex(self, table_name):
"""Get the metaindex of a table."""
sql_stmt = """DESCRIBE METAINDEX ON TABLE {}""".format(table_name)
try:
return self.run_sql(sql_stmt)
except Exception:
return None