ibmcloudsql package

Submodules

SQL Query module

class ibmcloudsql.SQLQuery.SQLQuery(api_key, instance_crn, target_cos_url=None, client_info='IBM Cloud SQL Query Python SDK', thread_safe=False, max_concurrent_jobs=4, max_tries=1, iam_max_tries=1)[source]

Bases: ibmcloudsql.cos.COSClient, ibmcloudsql.sql_magic.SQLBuilder, ibmcloudsql.catalog_table.HiveMetastore

The class the provides necessary APIs to interact with

  1. IBM SQL Serverless service

  2. IBM COS service

Parameters:
apikeystr, optional

an account-level API key [manage/Access (IAM)/IBM Cloud API keys]

instance_crn :str, optional

CRN from SQLQuery instance

target_cos_urlstr, optional

the URI where retrieved data is stored

max_concurrent_jobs: int, optional

the max number of concurrent jobs

client_infostr, optional

User-defined string

max_tries: int, optional

The number of time submit_sql(), should try to request CloudSQL before giving up.

iam_max_tries: int, optional

The number of times to request credential from IAM. By default, token is returned from a request to iam.cloud.ibm.com which may not be responsive (i.e. timeout=5 seconds). This parameter controls how many times to try.

thread_safe: bool, optional (=False)

If thread-safe is used, a new Session object is created upon this object creation

Attributes:
columns_in_unixtime

Return the name of columns whose values are in UNIX timestamp

cos_session

Get the current COS session.

my_jobs

Return information about jobs already queried via get_job()

project_lib

Project: IBM Watson Studio ProjectLib object

target_url

Return the target COS URL.

thread_safe

Check if SQLQuery is in thread-safe mode.

Methods

add_partitions(table_name, col_names)

Update the table with a partition column having new value.

analyze(job_id[, print_msg])

Provides some insights about the data layout from the current SQL statement

analyze_cos_url(cos_url)

return a namedtuple containing the 3 fields

configure([apikey, instance_crn, cos_out_url])

update the configuration

connect_project_lib(project[, file_name])

Connect to an IBM Watson Studio Project’s COS bucket for its own assets

copy_daily_objects(source_cos_url, …[, …])

Copy all objects from a source location in a per day folder on a target location, with a sliding window buffer of days.

copy_objects(source_path, cos_url)

Upload a file to a given COS URL

create_metaindex(table_name)

Create a metaindex for a given table.

create_partitioned_table(table_name[, …])

Create a partitioned table for data on COS.

create_table(table_name[, cos_url, …])

Create a table for data on COS.

delete_empty_objects(cos_url)

Delete zero-size objects.Reference to list_cos_objects() for further details..

delete_objects(cos_url[, dry_run, confirm, …])

delete all objects stored at the given COS URL

delete_result(jobId)

Delete the COS objects created by a given job-id

describe_table(table_name)

Return the schema of a table.

drop_all_tables()

Delete all created tables in the project.

drop_table([table_name])

Drop a given table.

drop_tables(table_names)

Drop a list of tables.

execute_sql(sql_stmt[, pagesize, …])

Extend the behavior of run_sql().It is a blocking call that waits for the job to finish (unlike submit_sql()), but it has the following features:.

export_job_history([cos_url, …])

Export the most recent jobs to COS URL

format_()

Perform string replacement needed so that the final result is a SQL statement that is accepted by IBM Spark SQL

from_cos_(cos_url[, format_type, alias, …])

FROM <cos-url> STORED AS <format_type> AS type [FIELDS TERMINATED BY delimiter] [AS alias] [, <cos-url> AS type [AS alias]]

from_table_(table[, alias])

FROM <table> [AS alias] [, <table> [AS alias]]

from_view_(sql_stmt)

FROM (<sql>)

get_bucket(cos_url)

return the bucket name from COS URL

get_bucket_info(cos_url)

Return the information of the given bucket

get_cos_summary(url)

Return information for the given COR URL (may include bucket + prefix)

get_endpoint(cos_url)

return the endpoint string from COS URL

get_exact_url(cos_url)

convert COS URL from using alias to exact URL

get_job(jobId)

Return the details of the job-id

get_jobs()

Return the up-to-30 most recent jobs from the given Cloud API

get_jobs_count_with_status(status)

return the number of jobs in the SQL Query server for the given status

get_jobs_with_status(job_id_list, status)

return the list of job_id, among those provided, that have the given status

get_metaindex(table_name)

Get the metaindex of a table.

get_number_running_jobs()

return the number of running jobs in the SQL Query server

get_prefix(cos_url)

return the prefix part from COS URL

get_result(jobId[, pagenumber])

Return the queried data from the given job-id

get_schema_data(cos_url[, typ, dry_run])

Return the schema of COS URL

get_schema_table(table_name)

Return the schema of table.

get_sql()

Return the current sql string

group_by_(columns)

GROUP BY <columns>

is_a_supported_storage_type(typ)

check if a type is in a supported format

is_valid_cos_url(cos_url)

Validate if a string is COS URL

join_cos_(cos_url, condition[, typ, alias])

[typ] JOIN <cos-url> [AS alias]

join_table_(table, condition[, typ, alias])

[typ] JOIN <table> [AS alias] ON <condition>

list_cos_objects(cos_url[, size_unit, …])

List all objects in the current COS URL.Also, please read the note to see the role of trailing slash in the COS URL..

list_results(jobId[, wait])

NOTE: A single SQL Query can store the queried data in the COS output in multiple objects/partitions

logon([force])

Establish a connection to IBM Cloud.

order_by_(columns)

ORDER BY <columns>

partition_by_(columns)

PARTITIONED BY <columns>

partition_objects_(num_objects)

PARTITIONED INTO <num> OBJECTS

partition_rows_(num_rows)

PARTITIONED INTO <num> ROWS

print_sql()

print() sql string

process_failed_jobs_until_all_completed(…)

re-send those that are failed - due to the time-out mechanism of SQL Query server

read_project_lib_data([file_name])

read the content from the given file (via ProjectLib in IBM Watson Studio’s COS bucket)

recover_table_partitions(table_name)

Update a partitioned table.This step is required after creating a new partitioned table..

rename_exact_result(jobId[, wait])

A SQL Query can store data into partitioned/paginated multiple objects, or single object.

rename_exact_result_joblist(job_list[, wait])

The bulk mode of rename_exact_result method.

reset_()

Reset and returns the current sql string

run([pagesize, get_result])

run the internal SQL statement provided by SQLBuilder using execute_sql()

run_sql(sql_text[, pagesize])

Submits a SQL job, waits for the job to finish (unlike submit_sql()) and return the result as Pandas DataFrame.

select_(columns)

SELECT <columns>

set_tracking_file(file_name)

provides the file name which is used for tracking multiple SQL requests

show_tables([target_cos_url, pattern])

List the available Hive Metastore.

sql_ui_link()

both print out and also return the string containing SQL Query URL

store_at_(cos_url[, format_type])

INTO <cos-url> STORED AS <type>

submit([pagesize])

run the internal SQL statement that you created using the APIs provided by SQLBuilder

submit_and_track_sql(sql_stmt[, pagesize, …])

Each SQL Query instance is limited by the number of sql queries that it can handle at a time.This can be a problem when you launch many SQL Query jobs, as such limitation may prevent you to complete all of them in one session.The max_tries options when creating the SQL Query client object allows you to re-send the job, which is still limited to one session.The time for one session is often limited when when using SQL Query client via Watson Studio, i.e.you will lose the session after having no interaction with the notebook for a period of time..

submit_sql(sql_stmt[, pagesize, stored_as])

Asynchronous call - submit and quickly return the job_id.

wait_for_job(jobId[, sleep_time])

It’s possible that the job’s failed because of Spark’s internal error which does not have any status.So “unknown” is added for such cases..

where_(condition)

WHERE <condition> [, <condition>]

with_(table_name, sql_stmt)

WITH <table> AS <sql> [, <table AS <sql>]

write_project_lib_data([file_name])

write the content to the given file (via ProjectLib in IBM Watson Studio’s COS bucket)

export_tags_for_cos_objects

get_session

analyze(job_id, print_msg=True)[source]

Provides some insights about the data layout from the current SQL statement

Parameters:
job_idstr

The job ID

print_msg: bool, optional

Default is True: print out the hints to the console

.. todo::
  1. new sql only when max_obj_size > 300MB

  2. check if STORED AS is used, if not suggested adding to sql with PARQUET or JSON

  3. add PARITIONED … not at the end, but right after STORED AS (which can be missing in original SQL)

configure(apikey=None, instance_crn=None, cos_out_url=None)[source]

update the configuration

delete_result(jobId)[source]

Delete the COS objects created by a given job-id

Returns:
dataframe

A dataframe, with 3 rows, and one field name “Deleted Object”

Notes

  • The last entry holds the real data, and the last word also indicates the data format

  • ‘JOBPREFIX NONE’ would avoid having ‘jobid=JOB-ID-NAME’ in the URL

Examples

Delete 3 entries in the output COS

cos://<cos-name>/bucket_name/jobid=<JOB-ID-NUMBER>/
cos://<cos-name>/bucket_name/jobid=<JOB-ID-NUMBER>/_SUCCESS
cos://<cos-name>/bucket_name/jobid=<JOB-ID-NUMBER>/[parquet|csv|json]
execute_sql(sql_stmt, pagesize=None, get_result=False, stored_as=None)[source]

Extend the behavior of run_sql(). It is a blocking call that waits for the job to finish (unlike submit_sql()), but it has the following features:

  1. returning of data (Pandas dataframe) is optional (controlled by get_result parameter): to help avoiding Python runtime memory overload.

    This is also useful when you run SQL statements such as DDLs that don’t produce results at all.

  2. returns a namedtuple, in that result.data is the one returned by run_sql, while result.job_id is the one returned by submit_sql

  3. raise an exception for a failed job

Parameters:
sql_stmt: str

the SQL statement to run

pagesize: int, optional

an integer indicating the number of rows for each partition/page [using PARTITIONED EVERY <pagesize> ROWS syntax]

get_result: bool, optional (default=False)

When set it will return only the job_id, but still wait for the job’s completion. Later, you can get the data using get_result() (job_id, pagenumber)

Returns:
namedtuple [data, job_id]
get_result = True, then behavior like run_sql() which materializes the returned data as type

pd.DataFrame in memory. The default behavior is opposite, to avoid unintended overload of memory.

Raises:
KeyError

when information about a failed job is missing (job_status, job_id, error, error_message)

SqlQueryFailException

when the sql query fails, e.g. time out on the server side

export_job_history(cos_url=None, export_file_prefix='job_export_', export_file_suffix='.parquet')[source]

Export the most recent jobs to COS URL

Parameters:
cos_urlstr

A COS URL with prefix, i.e. cos://<cos-name>/<bucket>/<prefix>, where the data will be stored

Raises:
ValueError

if COS URL is invalid

get_job(jobId)[source]

Return the details of the job-id

Returns:
dict

a dict of job details (see keys below)

Raises:
ValueError

when jobId is not correct

HTTPError

when RestAPI request fails

JSONDEcodeError

when RestAPI returns a non-JSON compliant result

Notes

'job_id',
'status', : "running", "failed", "completed"
'statement': "SELECT * ..." [the content of SQL Query],
'plan_id' ,
'submit_time',
'resultset_location',
'rows_returned',
'rows_read' ,
'bytes_read' ,
'resultset_format': 'csv',
'end_time': '2020-03-06T21:58:26.274Z',
'user_id': 'username@us.ibm.com'

Examples

{
    "bytes_read": 43058,
    "end_time": "2020-03-08T03:20:39.131Z",
    "job_id": "ab3f7567-280b-40c9-87a9-256b846f89db",
    "plan_id": "ead0f7f5-0c96-40c0-9aae-63c4846d8188",
    "resultset_format": "parquet",
    "resultset_location": "cos://s3.us-south.cloud-object-storage.appdomain.cloud/tuan-sql-result/customer_orders/jobid=ab3f7567-280b-40c9-87a9-256b846f89db",
    "rows_read": 921,
    "rows_returned": 830,
    "statement": "SELECT OrderID, c.CustomerID CustomerID, CompanyName, ContactName, ContactTitle, Address, City, Region, PostalCode, Country, Phone, Fax          EmployeeID, OrderDate, RequiredDate, ShippedDate, ShipVia, Freight, ShipName, ShipAddress,          ShipCity, ShipRegion, ShipPostalCode, ShipCountry FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET o,          cos://us-geo/sql/customers.parquet STORED AS PARQUET c          WHERE c.CustomerID = o.CustomerID          INTO cos://us-south/tuan-sql-result/customer_orders STORED AS PARQUET PARTITIONED BY (ShipCountry, ShipCity)",
    "status": "completed",
    "submit_time": "2020-03-08T03:20:00.617Z",
    "user_id": "tmhoangt@us.ibm.com"
}
get_jobs()[source]

Return the up-to-30 most recent jobs from the given Cloud API

Returns:
dataframe

a pd.DataFrame with fields - total 30 rows, corresponding to the 30 most recent jobs

Notes

  • get_jobs() is used by `export_job_history`(cos_out_url) which is used to save such data

Examples

job_id
status: "running", "failed", "completed"
user_id
statement
resultset_location
submit_time
end_time
rows_read
rows_returned
bytes_read
error
error_message
job_id      status  user_id statement       resultset_location      submit_time     end_time         ...
<long-string> completed     <email-here>    <query-string>  <cos-url-result-location> 2020-02-21T16:19:03.638Z      2020-02-21T16:19:13.691Z

rows_read   rows_returned   bytes_read      error   error_message
1760        29      41499   None    None
get_jobs_count_with_status(status)[source]

return the number of jobs in the SQL Query server for the given status

It has the limitation as described in get_jobs()

get_jobs_with_status(job_id_list, status)[source]

return the list of job_id, among those provided, that have the given status

Parameters:
job_id_list: list

List of job_id to check

statusstr

“completed”, “running”, or “failed”

Returns:
list:

List of job ids

get_number_running_jobs()[source]

return the number of running jobs in the SQL Query server

get_result(jobId, pagenumber=None)[source]

Return the queried data from the given job-id

Parameters:
jobId: int

The value, if not stored, can be retrieved from get_jobs()

pagenumber: int, optional

If the data, from the given job_id is saved in pages/partitions, then this should be a value in the range from 1 to len(self.list_results(job_id))

Returns:
dataframe

The dataframe holding the queried data from a completed job

Examples

curl -XGET \
    --url "https://api.sql-query.cloud.ibm.com/v2/sql_jobs?instance_crn=<YOUR_SQL_QUERY_CRN>" \
    -H "Accept: application/json" \
    -H "Authorization: Bearer <YOUR_BEARER_TOKEN>"  \
    -H "Content-Type: application/json"

"""
{
"jobs": [
    {
    "job_id": "7ebed7f7-00dc-44a2-acfa-5bdb53889648",
    "status": "completed",
    "submit_time": "2018-08-14T08:45:54.012Z",
    "user_id": "user1@ibm.com"
    },
    {
    "job_id": "ffde4c5a-1cc2-448b-b377-43573818e5d8",
    "status": "completed",
    "submit_time": "2018-08-14T08:47:33.350Z",
    "user_id": "user1@ibm.com"
    }
]
}
"""
response = requests.get(
    "https://api.sql-query.cloud.ibm.com/v2/sql_jobs/{}?instance_crn={}".format(jobId, self.instance_crn),
    headers=self.request_headers,
)

if response.status_code == 200 or response.status_code == 201:
    status_response = response.json()

https://cloud.ibm.com/apidocs/sql-query#run-an-sql-job
get_schema_data(cos_url, typ='json', dry_run=False)[source]

Return the schema of COS URL

Parameters:
cos_urlstr

The COS URL where data is stored

typstr, optional

The format type of the data, default is ‘json’ Use from [‘json’, ‘csv’, ‘parquet’] with case-insensitive

dry_run: bool, optional

This option, once selected as True, returns the internally generated SQL statement, and no job is queried.

result_format: string

‘dataframe’ ‘list’

Returns:
DataFrame

3 columns: name (object), nullable (bool), type (object)

Raises:
ValueError

in either scenarios: (1) target COS URL is not set, (2) invalid type, (3) invalid COS URL

is_a_supported_storage_type(typ)[source]

check if a type is in a supported format

list_results(jobId, wait=False)[source]

NOTE: A single SQL Query can store the queried data in the COS output in multiple objects/partitions

When one of those below is used

[ PARTITIONED BY,
PARTITIONED EVERY x ROWS      [implicitly used with pagesize=X option]
]
Parameters:
job_id: str

The Job ID

wait: bool, default:False

If True, wait for the requested job_id to complete to get the information

Returns:
None (or nothing) if the function fails

DataFrame (4 fields: ObjectURL, Size, Bucket, Object) - each row correspond to the information of one partition

Raises:
ValueError

Notes

To know the number of partitions being used, use ‘len(self.list_results(job_id))’

property my_jobs

Return information about jobs already queried via get_job() issued by this SQLClient class object

This is different from get_jobs()

Returns:
dict
process_failed_jobs_until_all_completed(job_id_list)[source]

re-send those that are failed - due to the time-out mechanism of SQL Query server

Here, if job_time < 40 minutes, then we re-send.

rename_exact_result(jobId, wait=False)[source]

A SQL Query can store data into partitioned/paginated multiple objects, or single object.

Even with single object, indeed, multiple objects are created, two of them has size 0. (<URL>/_SUCCESS, and <URL>/) beside the ones that hold data (<URL>/<data1>, <URL>/<data2>)

This API deletes the two 0-size objects, and keep only the ones that hold data.

Parameters:
job_idstr

A string representation of job_id

wait: bool, optional

The given job_id may not be completed yet, so you have the option to wait for it to completed first.

Default is False

Returns:
None
Raises:
ValueError

If the job_id is the job in that the result is “PARTITIONED BY” or “pagesize=” or “PARITIONED EVERY x ROWS” is used or the rename_exact_result() has been applied to this job_id.

rename_exact_result_joblist(job_list, wait=False)[source]

The bulk mode of rename_exact_result method.

Parameters:
job_list: list

A list of job_id

wait: bool, optional

The same meaning as the one used in rename_exact_result()

run(pagesize=None, get_result=False)[source]

run the internal SQL statement provided by SQLBuilder using execute_sql()

run_sql(sql_text, pagesize=None)[source]

Submits a SQL job, waits for the job to finish (unlike submit_sql()) and return the result as Pandas DataFrame.

Parameters:
sql_text: str

the SQL statement to run

pagesize: int, optional

an integer indicating the number of rows for each partition/page [using PARTITIONED EVERY <pagesize> ROWS syntax]

Returns:
pd.DataFrame with the query results.
Raises:
CosUrlNotFoundException

the COS URL is not valid

CosUrlInaccessibleException

the COS URL is inaccessible - no access granted to the given API key

SqlQueryInvalidFormatException

the format provided to COS URL is incorrect

KeyError

the returned error message does not have job_id, error, or error_message

Exception

unexpected exception

set_tracking_file(file_name)[source]

provides the file name which is used for tracking multiple SQL requests

Notes

This is the local file, which is used when you don’t have access to Watson Studio

both print out and also return the string containing SQL Query URL

submit(pagesize=None)[source]

run the internal SQL statement that you created using the APIs provided by SQLBuilder

submit_and_track_sql(sql_stmt, pagesize=None, file_name=None, force_rerun=False, stored_as=None)[source]

Each SQL Query instance is limited by the number of sql queries that it can handle at a time. This can be a problem when you launch many SQL Query jobs, as such limitation may prevent you to complete all of them in one session. The max_tries options when creating the SQL Query client object allows you to re-send the job, which is still limited to one session. The time for one session is often limited when when using SQL Query client via Watson Studio, i.e. you will lose the session after having no interaction with the notebook for a period of time.

This API provides the capability to put the information of each launched jobs in a file_name stored either

  • as an asset in the Watson Studio’s Project.

  • as a regular file in the local machine.

The SQL Query client will check the content of such file name to see if the given sql_stmt has been launched, and if so, whether it is completed or not. If not completed, then it relaunches the job, and update the content in this file. Otherwise, it skips the sql_stmt.

To check if a sql_stmt has been issued or not, the format_sql() transforms the query string into a style that can be used for string comparison that is tolerance to white spaces, new lines, comments, lower-case or upper-case uses in the query string. This is done by the decorator check_saved_jobs_decorator().

This is beneficial in the scenario when you launch many many jobs, and don’t want to restart from the beginning.

Parameters:
sql_stmt: str

sql string

pagesize: int, optional

the page size

file_name: str, optional

The file name should be a JSON file, i.e. $file_name.json. You need to provide the file name if

    1. you use Watson studio notebook and you haven’t provided it in connect_project_lib(),

    1. you use local notebook, and you want to use a local file to track it

You don’t need to provide the file name if you’re using Watson studio, and the file name has been provided via connect_project_lib().

force_rerun: bool

Rerun even if the given sql statement has been previously launched and completed. Be cautious when enabling this flag, as you may have duplicated number of data onto the same location. The reason for this flag is to provide the option to rerun a command - in that the previously created data has been deleted.

Notes

To use this API in Watson Studio, the SQL Query client must already connected to the ProjectLib object via connect_project_lib() method.

This APIs make use of COSClient.connect_project_lib(), COSClient.read_project_lib_data().

submit_sql(sql_stmt, pagesize=None, stored_as=None)[source]

Asynchronous call - submit and quickly return the job_id.

Parameters:
sql_stmt: str

SQL Query string

pagesize: int, optional

an integer indicating the number of rows for each partition/page [using PARTITIONED EVERY <pagesize> ROWS syntax]

stored_as: string

The type being used, only if ‘INTO … STORED AS …’ is not provided

Returns:
str

job_id

Raises:
RateLimitedException

when the SQLQUery instance is serving the max-limit of requests

SyntaxError

for both KeyError or HTTPError

Examples

curl -XPOST                 --url "https://api.sql-query.cloud.ibm.com/v2/sql_jobs?instance_crn=YOUR_SQL_QUERY_CRN"                 -H "Accept: application/json"                 -H "Authorization: Bearer YOUR_BEARER_TOKEN"                 -H "Content-Type: application/json"                 -d '{"statement":"SELECT firstname FROM cos://us-geo/sql/employees.parquet STORED AS PARQUET WHERE EMPLOYEEID=5 INTO cos://us-geo/target-bucket/q1-results" }'

NOTE:

1. All the headers (-H) can be put into a dictionary and passed to the headers argument of requests.post() API.

2. All the data (-d option) is put into a dictionary and passed to the json argument of requests.post() API.

  • ‘statement’: value is full SQL statement in string

  • ‘resultset_target’ (optional): only need when there is no ‘INTO statement’ in the query string, and the value must be the COS URL output

sqlData = {'statement': sql_stmt}
request_headers = {'Content-Type': 'application/json'}
request_headers.update({'Accept':'application/json'})
request_headers.update({'User-Agent': self.user_agent})
request_headers.update({'authorization': 'Bearer {}'.format(ro_credentials.token)})
response = requests.post(
    "https://api.sql-query.cloud.ibm.com/v2/sql_jobs?instance_crn={}".format(self.instance_crn),
    headers=request_headers,
    json=sqlData)
"""
{
    "errors": [
        {
        "code": "bad_request",
        "message": "Target url specified in parameter resultset_target and as part of into clause in statement"
        }
    ]
}
{
    "job_id": "e2adca0a-9247-4cfa-ac58-db4b2bc33a01",
    "status": "queued"
}
{
    "status_code": 429,
    "errors": [
        {
        "code": "too_many_requests",
        "message": "This instance is currently running its maximum number of query jobs. Try again later, after at least one of the currently running jobs has completed."
        }
    ]
}
"""
# error code information: https://cloud.ibm.com/apidocs/sql-query
wait_for_job(jobId, sleep_time=2)[source]

It’s possible that the job’s failed because of Spark’s internal error which does not have any status. So “unknown” is added for such cases.

Parameters:
jobId: str

The job-id

sleep_time: int, optional

The time interval to sleep before making a new check if the job is done

Returns:
‘failed’, ‘completed’, or ‘unknown’
ibmcloudsql.SQLQuery.check_saved_jobs_decorator(f)[source]

a decorator that load data from ProjectLib, check for completed SQL Query job, before deciding to launch it

ibmcloudsql.SQLQuery.save(file_name, data)[source]

Save file still ok in the case of Ctrl-C is pressed

ibmcloudsql.SQLQuery.validate_job_status(f)[source]

check if input about job status, via status argument is corrected

Catalog/Table module

Catalog Table.

class ibmcloudsql.catalog_table.HiveMetastore(target_url)[source]

Bases: object

This class supports the handling HIVE catalog table.

Attributes:
target_url

Return the target COS URL.

Methods

add_partitions(table_name, col_names)

Update the table with a partition column having new value.

configure(target_url)

Update the configuration.

create_metaindex(table_name)

Create a metaindex for a given table.

create_partitioned_table(table_name[, …])

Create a partitioned table for data on COS.

create_table(table_name[, cos_url, …])

Create a table for data on COS.

describe_table(table_name)

Return the schema of a table.

drop_all_tables()

Delete all created tables in the project.

drop_table([table_name])

Drop a given table.

drop_tables(table_names)

Drop a list of tables.

get_metaindex(table_name)

Get the metaindex of a table.

get_schema_table(table_name)

Return the schema of table.

recover_table_partitions(table_name)

Update a partitioned table.This step is required after creating a new partitioned table..

show_tables([target_cos_url, pattern])

List the available Hive Metastore.

add_partitions(table_name, col_names)[source]

Update the table with a partition column having new value.

configure(target_url)[source]

Update the configuration.

create_metaindex(table_name)[source]

Create a metaindex for a given table.

create_partitioned_table(table_name, cos_url=None, format_type='CSV', force_recreate=False, schema=None, partition_list=None)[source]

Create a partitioned table for data on COS.

Parameters:
table_name: str

the name of the table to be created

cos_urlstr, optional

The COS URL from which the table should reference to If not provided, it uses the internal self.cos_in_url

format_type: string, optional

The type of the data above (default: CSV)

force_recreate: bool

(True) force to recreate an existing table

schema: None or string

If None, then automatic schema detection is used. Otherwise, pass in the comma-separated string in the form “(columnName type, columnName type)”

partition_list: comma-separated string | list

the list of columns to be part of the partitioning. NOTE: the order matters.

Returns:
None
Raises:
ValueError:

when the argument is invalid for cos_url (invalid format or there is no such location), format_type (invalid value)

SqlQueryFailException:

when it cannot remove the given table name

create_table(table_name, cos_url=None, format_type='CSV', force_recreate=False, blocking=True, schema=None)[source]

Create a table for data on COS.

Parameters:
table_name: str

The name of the table

cos_urlstr, optional

The COS URL from which the table should reference to If not provided, it uses the internal self.cos_in_url

format_type: string, optional

The type of the data above that you want to reference (default: CSV)

force_recreate: bool, optional

(True) force to recreate an existing table

blocking: bool, optional

(True) wait until it returns the resut

schema: None or string

If None, then automatic schema detection is used. Otherwise, pass in the comma-separated string in the form “(columnName type, columnName type)”

Returns:
none if job “failed”

otherwise returns

Raises:
ValueError:

when the argument is invalid for cos_url (invalid format or there is no such location), format_type (invalid value)

SqlQueryDropTableException

when it cannot remove the given table name

SqlQueryCreateTableException

when it cannot create the given table name

describe_table(table_name)[source]

Return the schema of a table.

drop_all_tables()[source]

Delete all created tables in the project.

Returns:
bool

True [if success]

drop_table(table_name=None)[source]

Drop a given table.

Parameters:
table_name: str, optional

The name of the table If skipped, the being tracked catalog-table is used

Returns:
str:

a job status

Raises:
SqlQueryDropTableException

when it cannot remove the given table name

ValueError
drop_tables(table_names)[source]

Drop a list of tables.

Parameters:
table_names: list

A list of tables

get_metaindex(table_name)[source]

Get the metaindex of a table.

get_schema_table(table_name)[source]

Return the schema of table.

Parameters:
table_name: str

Name of the HIVE catalog table

Returns
——
DataFrame or None [if failed - table not found]

3 columns: col_name (object), data_type (object), comment (float64)

recover_table_partitions(table_name)[source]

Update a partitioned table. This step is required after creating a new partitioned table.

Parameters:
table_name: str

The partitioned table name

show_tables(target_cos_url=None, pattern=None)[source]

List the available Hive Metastore.

Parameters:
target_cos_url: string, optional

The COR URL where the information about the tables are stored

pattern: str, optional

If provided, this should be a pattern being used in name matching, e.g. ‘cus’, which finds all tables with the name has ‘cus’

Returns:
DataFrame or None

return None if there is an error.

Raises:
SqlQueryFailException
ValueError
property target_url

Return the target COS URL.

Cloud Object Storage module

class ibmcloudsql.cos.COSClient(cloud_apikey='', cos_url='', client_info='COS Client', staging=False, thread_safe=False, iam_max_tries=1)[source]

Bases: ibmcloudsql.cos.ParsedUrl, utilities.IBMCloudAccess

This class handles the interaction with IBM COS storage

Parameters:
cloud_apikeystr, optional

an account-level API key [manage/Access (IAM)/IBM Cloud API keys]

https://cloud.ibm.com/docs/iam/users_roles.html https://cloud.ibm.com/docs/services/cloud-object-storage?topic=cloud-object-storage-getting-started

cos_urlstr, optional

the COS URL where retrieved data is stored

client_infostr, optional

User-defined string

thread_safe: bool, optional (=False)

If thread-safe is used, a new Session object is created upon this object creation

Attributes:
cos_session

Get the current COS session.

project_lib

Project: IBM Watson Studio ProjectLib object

thread_safe

Check if SQLQuery is in thread-safe mode.

Methods

analyze_cos_url(cos_url)

return a namedtuple containing the 3 fields

configure([cloud_apikey])

Update Cloud API key

connect_project_lib(project[, file_name])

Connect to an IBM Watson Studio Project’s COS bucket for its own assets

copy_daily_objects(source_cos_url, …[, …])

Copy all objects from a source location in a per day folder on a target location, with a sliding window buffer of days.

copy_objects(source_path, cos_url)

Upload a file to a given COS URL

delete_empty_objects(cos_url)

Delete zero-size objects.Reference to list_cos_objects() for further details..

delete_objects(cos_url[, dry_run, confirm, …])

delete all objects stored at the given COS URL

get_bucket(cos_url)

return the bucket name from COS URL

get_bucket_info(cos_url)

Return the information of the given bucket

get_cos_summary(url)

Return information for the given COR URL (may include bucket + prefix)

get_endpoint(cos_url)

return the endpoint string from COS URL

get_exact_url(cos_url)

convert COS URL from using alias to exact URL

get_prefix(cos_url)

return the prefix part from COS URL

is_valid_cos_url(cos_url)

Validate if a string is COS URL

list_cos_objects(cos_url[, size_unit, …])

List all objects in the current COS URL.Also, please read the note to see the role of trailing slash in the COS URL..

logon([force])

Establish a connection to IBM Cloud.

read_project_lib_data([file_name])

read the content from the given file (via ProjectLib in IBM Watson Studio’s COS bucket)

write_project_lib_data([file_name])

write the content to the given file (via ProjectLib in IBM Watson Studio’s COS bucket)

export_tags_for_cos_objects

get_session

connect_project_lib(project, file_name=None)[source]

Connect to an IBM Watson Studio Project’s COS bucket for its own assets

Parameters:
project: Project

The project-lib object

file_name: str, optional

The name of the file in ProjectLib’s COS bucket where data will be read/stored

copy_daily_objects(source_cos_url, target_cos_url, buffer_days=1)[source]

Copy all objects from a source location in a per day folder on a target location, with a sliding window buffer of days.

Parameters:
source_cos_url: str

Your source data in format cos://us-south/<bucket-name>/object_path/

source_cos_url: str

Your target data in format cos://us-south/<bucket-name>/object_path/

buffer_days: sint

Number of additional days before and after each day for which to copy the landed objects into the single target day folder.

Returns:
None
Raises:
ValueError

if COS URL is invalid

copy_objects(source_path, cos_url)[source]

Upload a file to a given COS URL

Raises:
ValueError

if COS URL is invalid

delete_empty_objects(cos_url)[source]

Delete zero-size objects. Reference to list_cos_objects() for further details.

Raises:
ValueError

if COS URL is invalid

delete_objects(cos_url, dry_run=False, confirm=False, get_result=True)[source]

delete all objects stored at the given COS URL

https://<cos-url>/<bucket>?prefix=<prefix-path>

Parameters:
confirm: bool, default=False

confirm before deleting

get_result: bool, default=True

return the result, can be slow on large number of objects

Returns:
pd.DataFrame

A single column dataframe: [“Deleted Object”]

Raises:
ValueError

if COS URL is invalid

Notes

Reference: AWS doc

curl -XGET                 --url "https://<COS-exact-URL>/<bucket-name>?prefix=<YOUR_PREFIX>"

https://s3.us-south.objectstorage.softlayer.net/sql-query-cos-access-ts?prefix=aiops/Location=us-south/DC=rgdal10/Year=2020/Month=02/Date=06

response.content returns a string in XML format
-----------
(' response = b'<?xml version="1.0" encoding="UTF-8" '
'standalone="yes"?><ListBucketResult ' 'xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>sql-query-cos-access-ts</Name>
<Prefix>aiops/Location=us-south/DC=rgdal10/Year=2020/Month=02/Date=05</Prefix>
<Marker></Marker>
<MaxKeys>1000</MaxKeys>
<Delimiter></Delimiter>
<IsTruncated>false</IsTruncated>

--> 0 or more
<Contents>
       <Key>aiops/Location=us-south/DC=rgdal10/Year=2020/Month=02/Date=05/Hour=1/part-00066-fd76d4c7-ea0a-40c3-8170-8f281a19ab5f-attempt_20200310232241_0027_m_000066_0.c000.snappy.parquet</Key>
       <LastModified>2020-03-10T23:26:55.957Z</LastModified>
       <ETag>&quot;9b9c012a341fe7f4b9988c59cab96757&quot;</ETag>
       <Size>1771294485</Size>
       <Owner>
            <ID>899ab340-5a4d-4ae2-a1f7-5e39299735b4</ID>
            <DisplayName>899ab340-5a4d-4ae2-a1f7-5e39299735b4</DisplayName></Owner>
       <StorageClass>STANDARD</StorageClass></Contents>

</ListBucketResult>'')

As the entity tag (ETag) is a hash of the object, we can use it to reliably check whether the object has changed - better than just file size and modification date. Sample code to handle request.response

export_tags_for_cos_objects(cos_url, export_target_cos_file)[source]
get_bucket_info(cos_url)[source]

Return the information of the given bucket

Raises:
ValueError

if invalid COS URL

Examples

curl https://config.cloud-object-storage.cloud.ibm.com/v1/b/my-bucket             -H 'authorization: bearer <IAM_token>'

200 status code:

# NOTE: long strings are broken using ( sub1, sub2)
{
"name": "my-new-bucket",
"crn": ("crn:v1:bluemix:public:cloud-object-storage:global:"
    "a/ 3bf0d9003abfb5d29761c3e97696b71c:xxxxxxx-6c4f-4a62-a165-696756d63903:bucket:my-new-bucket"),
"service_instance_id": "xxxxxxx-6c4f-4a62-a165-696756d63903",
"service_instance_crn": ("crn:v1:bluemix:public:cloud-object-storage:global"
    ":a/3bf0d9003abfb5d29761c3e97696b71c:xxxxxxx-6c4f-4a62-a165-696756d63903::"),
"time_created": "2018-03-26T16:23:36.980Z",
"time_updated": "2018-10-17T19:29:10.117Z",
"object_count": 764265234,
"bytes_used": 28198745752445144
}
get_cos_summary(url)[source]

Return information for the given COR URL (may include bucket + prefix)

Returns:
dict
A dict with keys

“largest_object” “largest_object_size” “newest_object_timestamp” “oldest_object_timestamp” “smallest_object” “smallest_object_size” “total_objects” “total_volume” “url”

Notes
Example: self.get_cos_summary_demo()
list_cos_objects(cos_url, size_unit=None, sort_by_size=False)[source]

List all objects in the current COS URL. Also, please read the note to see the role of trailing slash in the COS URL.

Parameters:
url: str

A URI prefix. e.g., cos://us-south/<bucket-name>/object_path/

size_unit: str, optional (“B” = byte)

A value indicate the unit of “Size” column. [“B”, “KB”, “MB”, “GB”]

Returns:
pd.DataFrame

The following columns (“Object”, “Size”, “StorageClass”)

Raises:
ValueError

if COS URL is invalid

Notes

Having a trailing slash (/) makes a difference in the returned result, as an asterisk is added at the end. So, “/prefix” would consider things like “/prefix_unexpected/value” and “/prefix/expected_value”; while “/prefix/” only consider “/prefix/expected_value”.

Examples

Only “list_objects’ work for IBM COS, not ‘list_objects_v2’

IBM-COS-SDK

property project_lib

IBM Watson Studio ProjectLib object

Type:

Project

read_project_lib_data(file_name=None)[source]

read the content from the given file (via ProjectLib in IBM Watson Studio’s COS bucket)

Parameters:
file_name: str, optional

If not specified, use the one defined from the beginning

write_project_lib_data(file_name=None)[source]

write the content to the given file (via ProjectLib in IBM Watson Studio’s COS bucket)

Parameters:
file_name: str, optional

If not specified, use the one defined from the beginning

class ibmcloudsql.cos.ParsedUrl[source]

Bases: object

Use this class to extract information from COS URL

cos://<cos-name>/<bucket>/<prefix>/

Methods

analyze_cos_url(cos_url)

return a namedtuple containing the 3 fields

get_bucket(cos_url)

return the bucket name from COS URL

get_endpoint(cos_url)

return the endpoint string from COS URL

get_exact_url(cos_url)

convert COS URL from using alias to exact URL

get_prefix(cos_url)

return the prefix part from COS URL

is_valid_cos_url(cos_url)

Validate if a string is COS URL

analyze_cos_url(cos_url)[source]

return a namedtuple containing the 3 fields

  • bucket

  • endpoint

  • prefix

Parameters:
cos_urlstr

COS URL

get_bucket(cos_url)[source]

return the bucket name from COS URL

get_endpoint(cos_url)[source]

return the endpoint string from COS URL

get_exact_url(cos_url)[source]

convert COS URL from using alias to exact URL

get_prefix(cos_url)[source]

return the prefix part from COS URL

is_valid_cos_url(cos_url)[source]

Validate if a string is COS URL

Returns:

Return type:

bool

class ibmcloudsql.cos.ProjectLib(project, file_name, file_type='json')[source]

Bases: object

This is used by SQLClient/COSClient via read() and write() methods

Parameters:
project: project_lib.Project

The object

from project_lib import Project

file_name: str

The file_name where the data about SQL queries’ jobs should be read/stored

The content of this file is used to track progress

file_type: str, optional

The file format of file_name

.. todo::

NOTE: Currently support only one file

To support many files, we can switch to using dict such as self._data_out[file_name]

Attributes:
data

file-like object: storing the file-content

project

Project: the project-lib object

Methods

read([file_name, file_type])

Read from project-lib’s file into file-like object

write([file_name, file_type])

Write the file-like data back to project-lib’s file

property data

storing the file-content

Type:

file-like object

property project

the project-lib object

Type:

Project

read(file_name=None, file_type='json')[source]

Read from project-lib’s file into file-like object

Parameters:
file_name: str, optional

File name in the Watson Studio’s project assets. If the file is not provided, then it reads the one passed into the object’s constructor.

file_type: str, optional

The type of file, “json” or “csv”

Returns:
file-like object:

The content of the data, in dict (json) or pd.DataFrame (csv)

write(file_name=None, file_type='json')[source]

Write the file-like data back to project-lib’s file

Parameters:
file_name: str

File name

file_type: str, optional

The type of file, “json” or “csv”

Returns
——
dict

Examples

{'asset_id': '1deebaad-8ad3-4861-8c52-e714d8eef2a9',
'bucket_name': 'projectlib351fb93e171c44369663ff79b938828d',
'file_name': 'iris1.csv',
'message': 'File iris1.csv has been written successfully to the associated OS'}

Exceptions module

exception ibmcloudsql.exceptions.CosUrlInaccessibleException(msg, original_exception=None)[source]

Bases: Exception

The error when the Cloud-Object Storage (COS) URL being used is not accessible

exception ibmcloudsql.exceptions.CosUrlNotFoundException(msg, original_exception=None)[source]

Bases: Exception

The error when the Cloud-Object Storage (COS) URL being used is invalid or not accessible

exception ibmcloudsql.exceptions.InternalError502Exception(msg, original_exception=None)[source]

Bases: Exception

The error when SQL Query returns a 502 internal error

exception ibmcloudsql.exceptions.RateLimitedException(msg, original_exception=None)[source]

Bases: Exception

The error when number of requests exceeds the capacity

exception ibmcloudsql.exceptions.SqlQueryCreateTableException(msg, original_exception=None)[source]

Bases: ibmcloudsql.exceptions.SqlQueryFailException

The error raised when a running create-table sql job fails

exception ibmcloudsql.exceptions.SqlQueryCrnInvalidFormatException(msg, original_exception=None)[source]

Bases: Exception

The error when the SQL Query CRN is not correct

exception ibmcloudsql.exceptions.SqlQueryDropTableException(msg, original_exception=None)[source]

Bases: ibmcloudsql.exceptions.SqlQueryFailException

The error raised when a running drop-table sql job fails

exception ibmcloudsql.exceptions.SqlQueryFailException(msg, original_exception=None)[source]

Bases: Exception

The error raised when a running sql job fails, e.g. timeout

exception ibmcloudsql.exceptions.SqlQueryInvalidFormatException(msg, original_exception=None)[source]

Bases: ibmcloudsql.exceptions.SqlQueryFailException

The error raised when the format of COS URL is not valid

exception ibmcloudsql.exceptions.SqlQueryInvalidPlanException(msg, original_exception=None)[source]

Bases: Exception

The error when the used feature is not supported by the current service plan - e.g. need to upgrade to Standard Plan or higher

exception ibmcloudsql.exceptions.UnsupportedStorageFormatException(msg, original_exception=None)[source]

Bases: Exception

The error when the SQL uses a format of data that has not been supported yet

Magic module

class ibmcloudsql.sql_magic.SQLBuilder[source]

Bases: ibmcloudsql.sql_magic.TimeSeriesSchema

The class supports constructing a full SQL query statement

Attributes:
columns_in_unixtime

Return the name of columns whose values are in UNIX timestamp

Methods

format_()

Perform string replacement needed so that the final result is a SQL statement that is accepted by IBM Spark SQL

from_cos_(cos_url[, format_type, alias, …])

FROM <cos-url> STORED AS <format_type> AS type [FIELDS TERMINATED BY delimiter] [AS alias] [, <cos-url> AS type [AS alias]]

from_table_(table[, alias])

FROM <table> [AS alias] [, <table> [AS alias]]

from_view_(sql_stmt)

FROM (<sql>)

get_sql()

Return the current sql string

group_by_(columns)

GROUP BY <columns>

join_cos_(cos_url, condition[, typ, alias])

[typ] JOIN <cos-url> [AS alias]

join_table_(table, condition[, typ, alias])

[typ] JOIN <table> [AS alias] ON <condition>

order_by_(columns)

ORDER BY <columns>

partition_by_(columns)

PARTITIONED BY <columns>

partition_objects_(num_objects)

PARTITIONED INTO <num> OBJECTS

partition_rows_(num_rows)

PARTITIONED INTO <num> ROWS

print_sql()

print() sql string

reset_()

Reset and returns the current sql string

select_(columns)

SELECT <columns>

store_at_(cos_url[, format_type])

INTO <cos-url> STORED AS <type>

where_(condition)

WHERE <condition> [, <condition>]

with_(table_name, sql_stmt)

WITH <table> AS <sql> [, <table AS <sql>]

format_()[source]

Perform string replacement needed so that the final result is a SQL statement that is accepted by IBM Spark SQL

from_cos_(cos_url, format_type='parquet', alias=None, delimiter=None)[source]

FROM <cos-url> STORED AS <format_type> AS type [FIELDS TERMINATED BY delimiter] [AS alias] [, <cos-url> AS type [AS alias]]

from_table_(table, alias=None)[source]

FROM <table> [AS alias] [, <table> [AS alias]]

from_view_(sql_stmt)[source]

FROM (<sql>)

get_sql()[source]

Return the current sql string

group_by_(columns)[source]

GROUP BY <columns>

join_cos_(cos_url, condition, typ='inner', alias=None)[source]

[typ] JOIN <cos-url> [AS alias]

join_table_(table, condition, typ='inner', alias=None)[source]

[typ] JOIN <table> [AS alias] ON <condition>

NOTE: [typ] is a value in the list below

["INNER", "CROSS", "OUTER", "LEFT", "LEFT OUTER", "LEFT SEMI",
"RIGHT", "RIGHT OUTER", "FULL", "FULL OUTER", "ANTI", "LEFT ANTI"]
order_by_(columns)[source]

ORDER BY <columns>

Parameters:
condition: str

a string representing a comma-separated list of columns

partition_by_(columns)[source]

PARTITIONED BY <columns>

partition_objects_(num_objects)[source]

PARTITIONED INTO <num> OBJECTS

partition_rows_(num_rows)[source]

PARTITIONED INTO <num> ROWS

print_sql()[source]

print() sql string

reset_()[source]

Reset and returns the current sql string

select_(columns)[source]

SELECT <columns>

Parameters:
columns: str

a string representing a comma-separated list of columns

store_at_(cos_url, format_type='CSV')[source]

INTO <cos-url> STORED AS <type>

where_(condition)[source]

WHERE <condition> [, <condition>]

with_(table_name, sql_stmt)[source]

WITH <table> AS <sql> [, <table AS <sql>]

class ibmcloudsql.sql_magic.TimeSeriesSchema[source]

Bases: object

The class tracks the columns that is useful in time-series handling. Currently, it tracks column names whose values are in UNIX time format

Attributes:
columns_in_unixtime

Return the name of columns whose values are in UNIX timestamp

property columns_in_unixtime

Return the name of columns whose values are in UNIX timestamp

class ibmcloudsql.sql_magic.TimeSeriesTransformInput[source]

Bases: object

This class contains methods that supports the transformation of a user-friendly arguments in time-series functions to IBM CloudSQL compatible values.

Methods

transform_sql(f)

Generate SQL string

ts_segment_by_time(sql_stmt)

Revise arguments of TS_SEGMENT_BY_TIME function to comply with IBM CloudSQL

classmethod transform_sql(f)[source]

Generate SQL string

Notes

Syntax: https://cloud.ibm.com/docs/sql-query

classmethod ts_segment_by_time(sql_stmt)[source]

Revise arguments of TS_SEGMENT_BY_TIME function to comply with IBM CloudSQL

Notes

The TS_SEGMENT_BY_TIME supported by IBM CloudSQL accepts value in number, which is not user-friendly for units like hour, days, minutes. SQLBuilder alllows constructing SQL query string using the below values.

1. values: per_hour, hour, per_day, day, per_week, week, minute, Xminute (X is a number divisible by 60)

ts_segment_by_time(ts, per_hour, per_hour)
ts_segment_by_time(ts, hour, hour)
  1. or values: using ISO 8601 https://en.wikipedia.org/wiki/ISO_8601#Durations

    P[n]Y[n]M[n]DT[n]H[n]M[n]S or P[n]W

Examples

ts_segment_by_time(ts, PT1H, PT1H)

into

ts_segment_by_time(ts, 3600000, 3600000)

as ts_segment_by_time operates at mili-seconds level, hour=60*60*1000 miliseconds

ibmcloudsql.sql_magic.format_sql(sql_stmt)[source]

format SQL string to ensure proper content for string comparison

Parameters:
sql_stmt: str
ibmcloudsql.sql_magic.print_sql(sql_stmt)[source]

Time-Series SQL Query module

class ibmcloudsql.sql_query_ts.SQLClientTimeSeries(api_key='', instance_crn='', target_cos_url=None, max_concurrent_jobs=4, max_tries=1, iam_max_tries=1, thread_safe=False, client_info='TimeSeries Cloud SQL Query Python')[source]

Bases: SQLQuery.SQLQuery

This class augments SQLClient with time-series functionality

HISTORY: Aug-10-2021: expand _get_ts_datasource_v3()

Attributes:
columns_in_unixtime

Return the name of columns whose values are in UNIX timestamp

cos_session

Get the current COS session.

my_jobs

Return information about jobs already queried via get_job()

project_lib

Project: IBM Watson Studio ProjectLib object

target_url

Return the target COS URL.

thread_safe

Check if SQLQuery is in thread-safe mode.

Methods

add_partitions(table_name, col_names)

Update the table with a partition column having new value.

analyze(job_id[, print_msg])

Provides some insights about the data layout from the current SQL statement

analyze_cos_url(cos_url)

return a namedtuple containing the 3 fields

configure([apikey, instance_crn, cos_out_url])

update the configuration

connect_project_lib(project[, file_name])

Connect to an IBM Watson Studio Project’s COS bucket for its own assets

copy_daily_objects(source_cos_url, …[, …])

Copy all objects from a source location in a per day folder on a target location, with a sliding window buffer of days.

copy_objects(source_path, cos_url)

Upload a file to a given COS URL

create_metaindex(table_name)

Create a metaindex for a given table.

create_partitioned_table(table_name[, …])

Create a partitioned table for data on COS.

create_table(table_name[, cos_url, …])

Create a table for data on COS.

delete_empty_objects(cos_url)

Delete zero-size objects.Reference to list_cos_objects() for further details..

delete_objects(cos_url[, dry_run, confirm, …])

delete all objects stored at the given COS URL

delete_result(jobId)

Delete the COS objects created by a given job-id

describe_table(table_name)

Return the schema of a table.

drop_all_tables()

Delete all created tables in the project.

drop_table([table_name])

Drop a given table.

drop_tables(table_names)

Drop a list of tables.

execute_sql(*args, **kwargs)

Extend the behavior of run_sql().It is a blocking call that waits for the job to finish (unlike submit_sql()), but it has the following features:.

export_job_history([cos_url, …])

Export the most recent jobs to COS URL

format_()

Perform string replacement needed so that the final result is a SQL statement that is accepted by IBM Spark SQL

from_cos_(cos_url[, format_type, alias, …])

FROM <cos-url> STORED AS <format_type> AS type [FIELDS TERMINATED BY delimiter] [AS alias] [, <cos-url> AS type [AS alias]]

from_table_(table[, alias])

FROM <table> [AS alias] [, <table> [AS alias]]

from_view_(sql_stmt)

FROM (<sql>)

get_bucket(cos_url)

return the bucket name from COS URL

get_bucket_info(cos_url)

Return the information of the given bucket

get_cos_summary(url)

Return information for the given COR URL (may include bucket + prefix)

get_endpoint(cos_url)

return the endpoint string from COS URL

get_exact_url(cos_url)

convert COS URL from using alias to exact URL

get_job(jobId)

Return the details of the job-id

get_jobs()

Return the up-to-30 most recent jobs from the given Cloud API

get_jobs_count_with_status(status)

return the number of jobs in the SQL Query server for the given status

get_jobs_with_status(job_id_list, status)

return the list of job_id, among those provided, that have the given status

get_metaindex(table_name)

Get the metaindex of a table.

get_number_running_jobs()

return the number of running jobs in the SQL Query server

get_prefix(cos_url)

return the prefix part from COS URL

get_result(jobId[, pagenumber])

Return the queried data from the given job-id

get_schema_data(cos_url[, typ, dry_run])

Return the schema of COS URL

get_schema_table(table_name)

Return the schema of table.

get_sql()

Return the current sql string

get_ts_datasource(cos_in, key, time_stamp, …)

Prepare the data source for time-series in the next-query, in that the result is broken down into multiple objects using num_objects as the criteria

get_ts_datasource_from_table(table_name, …)

Prepare the data source for time-series in the next-query, in that the result is broken down into multiple objects using num_objects as the criteria

group_by_(columns)

GROUP BY <columns>

human_form_to_machine_form(sql_stmt)

apply magic tricks to convert some useful names, e.g.hour, day, …to the expected numeric value in [ms] - which is the expected input to TimeSeries functions in SQL.

is_a_supported_storage_type(typ)

check if a type is in a supported format

is_valid_cos_url(cos_url)

Validate if a string is COS URL

join_cos_(cos_url, condition[, typ, alias])

[typ] JOIN <cos-url> [AS alias]

join_table_(table, condition[, typ, alias])

[typ] JOIN <table> [AS alias] ON <condition>

list_cos_objects(cos_url[, size_unit, …])

List all objects in the current COS URL.Also, please read the note to see the role of trailing slash in the COS URL..

list_results(jobId[, wait])

NOTE: A single SQL Query can store the queried data in the COS output in multiple objects/partitions

logon([force])

Establish a connection to IBM Cloud.

order_by_(columns)

ORDER BY <columns>

partition_by_(columns)

PARTITIONED BY <columns>

partition_objects_(num_objects)

PARTITIONED INTO <num> OBJECTS

partition_rows_(num_rows)

PARTITIONED INTO <num> ROWS

print_sql()

print() sql string

process_failed_jobs_until_all_completed(…)

re-send those that are failed - due to the time-out mechanism of SQL Query server

read_project_lib_data([file_name])

read the content from the given file (via ProjectLib in IBM Watson Studio’s COS bucket)

recover_table_partitions(table_name)

Update a partitioned table.This step is required after creating a new partitioned table..

rename_exact_result(jobId[, wait])

A SQL Query can store data into partitioned/paginated multiple objects, or single object.

rename_exact_result_joblist(job_list[, wait])

The bulk mode of rename_exact_result method.

reset_()

Reset and returns the current sql string

run([pagesize, get_result])

run the internal SQL statement provided by SQLBuilder using execute_sql()

run_sql(*args, **kwargs)

Submits a SQL job, waits for the job to finish (unlike submit_sql()) and return the result as Pandas DataFrame.

select_(columns)

SELECT <columns>

set_tracking_file(file_name)

provides the file name which is used for tracking multiple SQL requests

show_tables([target_cos_url, pattern])

List the available Hive Metastore.

sql_ui_link()

both print out and also return the string containing SQL Query URL

store_at_(cos_url[, format_type])

INTO <cos-url> STORED AS <type>

submit([pagesize])

run the internal SQL statement that you created using the APIs provided by SQLBuilder

submit_and_track_sql(sql_stmt[, pagesize, …])

Each SQL Query instance is limited by the number of sql queries that it can handle at a time.This can be a problem when you launch many SQL Query jobs, as such limitation may prevent you to complete all of them in one session.The max_tries options when creating the SQL Query client object allows you to re-send the job, which is still limited to one session.The time for one session is often limited when when using SQL Query client via Watson Studio, i.e.you will lose the session after having no interaction with the notebook for a period of time..

submit_sql(*args, **kwargs)

Asynchronous call - submit and quickly return the job_id.

wait_for_job(jobId[, sleep_time])

It’s possible that the job’s failed because of Spark’s internal error which does not have any status.So “unknown” is added for such cases..

where_(condition)

WHERE <condition> [, <condition>]

with_(table_name, sql_stmt)

WITH <table> AS <sql> [, <table AS <sql>]

write_project_lib_data([file_name])

write the content to the given file (via ProjectLib in IBM Watson Studio’s COS bucket)

export_tags_for_cos_objects

get_session

execute_sql(*args, **kwargs)[source]

Extend the behavior of run_sql(). It is a blocking call that waits for the job to finish (unlike submit_sql()), but it has the following features:

  1. returning of data (Pandas dataframe) is optional (controlled by get_result parameter): to help avoiding Python runtime memory overload.

    This is also useful when you run SQL statements such as DDLs that don’t produce results at all.

  2. returns a namedtuple, in that result.data is the one returned by run_sql, while result.job_id is the one returned by submit_sql

  3. raise an exception for a failed job

Parameters:
sql_stmt: str

the SQL statement to run

pagesize: int, optional

an integer indicating the number of rows for each partition/page [using PARTITIONED EVERY <pagesize> ROWS syntax]

get_result: bool, optional (default=False)

When set it will return only the job_id, but still wait for the job’s completion. Later, you can get the data using get_result() (job_id, pagenumber)

Returns:
namedtuple [data, job_id]
get_result = True, then behavior like run_sql() which materializes the returned data as type

pd.DataFrame in memory. The default behavior is opposite, to avoid unintended overload of memory.

Raises:
KeyError

when information about a failed job is missing (job_status, job_id, error, error_message)

SqlQueryFailException

when the sql query fails, e.g. time out on the server side

get_ts_datasource(cos_in, key, time_stamp, observation, cos_out, granularity='raw', where_clause='', ops='avg', dry_run=False, keep_col_names: bool = True, cast_observation=None, num_objects=20, print_warning=True)[source]

Prepare the data source for time-series in the next-query, in that the result is broken down into multiple objects using num_objects as the criteria

It will returns the data source in 3 columns:
  • keep_col_names <- False: use exact values below

    field_name, time_stamp, observation

  • keep_col_names <- True: use the values passed via arguments

    <key>, <time_stamp>, <observation>

Parameters:
cos_in: str

The data source in “COS_URL stored as <format>”

key: str

The column name being used as the key

time_stamp: str

The column name being used as timestick

observation: str

The column name being used as value

cast_observation: str, optional=None

The type to be casted for the observation column

cos_out: str

The COS URL where the data is copied to - later as data source

granularity: str

a value in one of [“raw”, “per_min”, “per_<x>min”, “per_sec”, “per_<x>sec”] with <x> is a number divided by 60, e.g. 10, 15

dry_run: bool, optional

This option, once selected as True, returns the internally generated SQL statement, and no job is queried.

num_objects: int, optional

The number of objects to be created for storing the data

print_warning: bool, default=True

print a warning or not

keep_col_names: bool, optional (False)

By default, all 3 original column names are maintained. If you set to false, they are mapped to field_name (for key), time_stamp and observation, respectively.

Returns:
str

The COS_URL where the data with 3 fields (key, time_stamp, observation) and can be digested into time-series via TIME_SERIES_FORMAT(key, timestick, value)

get_ts_datasource_from_table(table_name, key, time_stamp, observation, cos_out, granularity='raw', where_clause='', ops='avg', dry_run=False, keep_col_names: bool = True, cast_observation=None, num_objects=20, print_warning=True)[source]

Prepare the data source for time-series in the next-query, in that the result is broken down into multiple objects using num_objects as the criteria

It will returns the data source in 3 columns:
  • keep_col_names <- False: use exact values below

    field_name, time_stamp, observation

  • keep_col_names <- True: use the values passed via arguments

    <key>, <time_stamp>, <observation>

Parameters:
table: str

The catalog table name NOTE: Use either cos_in or table, but not both

key: str

The column name being used as the key

time_stamp: str

The column name being used as timestick

observation: str

The column name being used as value

cast_observation: str, optional=None

The type to be casted for the observation column

cos_out: str

The COS URL where the data is copied to - later as data source

granularity: str

a value in one of [“raw”, “per_min”, “per_<x>min”, “per_sec”, “per_<x>sec”] with <x> is a number divided by 60, e.g. 10, 15

dry_run: bool, optional

This option, once selected as True, returns the internally generated SQL statement, and no job is queried.

num_objects: int, optional

The number of objects to be created for storing the data

print_warning: bool, default=True

print a warning or not

keep_col_names: bool, optional (False)

By default, all 3 original column names are maintained. If you set to false, they are mapped to field_name (for key), time_stamp and observation, respectively.

Returns:
str

The COS_URL where the data with 3 fields (key, time_stamp, observation) and can be digested into time-series via TIME_SERIES_FORMAT(key, timestick, value)

human_form_to_machine_form(sql_stmt)[source]

apply magic tricks to convert some useful names, e.g. hour, day, … to the expected numeric value in [ms] - which is the expected input to TimeSeries functions in SQL

run_sql(*args, **kwargs)[source]

Submits a SQL job, waits for the job to finish (unlike submit_sql()) and return the result as Pandas DataFrame.

Parameters:
sql_text: str

the SQL statement to run

pagesize: int, optional

an integer indicating the number of rows for each partition/page [using PARTITIONED EVERY <pagesize> ROWS syntax]

Returns:
pd.DataFrame with the query results.
Raises:
CosUrlNotFoundException

the COS URL is not valid

CosUrlInaccessibleException

the COS URL is inaccessible - no access granted to the given API key

SqlQueryInvalidFormatException

the format provided to COS URL is incorrect

KeyError

the returned error message does not have job_id, error, or error_message

Exception

unexpected exception

submit_sql(*args, **kwargs)[source]

Asynchronous call - submit and quickly return the job_id.

Parameters:
sql_stmt: str

SQL Query string

pagesize: int, optional

an integer indicating the number of rows for each partition/page [using PARTITIONED EVERY <pagesize> ROWS syntax]

stored_as: string

The type being used, only if ‘INTO … STORED AS …’ is not provided

Returns:
str

job_id

Raises:
RateLimitedException

when the SQLQUery instance is serving the max-limit of requests

SyntaxError

for both KeyError or HTTPError

Examples

curl -XPOST                 --url "https://api.sql-query.cloud.ibm.com/v2/sql_jobs?instance_crn=YOUR_SQL_QUERY_CRN"                 -H "Accept: application/json"                 -H "Authorization: Bearer YOUR_BEARER_TOKEN"                 -H "Content-Type: application/json"                 -d '{"statement":"SELECT firstname FROM cos://us-geo/sql/employees.parquet STORED AS PARQUET WHERE EMPLOYEEID=5 INTO cos://us-geo/target-bucket/q1-results" }'

NOTE:

1. All the headers (-H) can be put into a dictionary and passed to the headers argument of requests.post() API.

2. All the data (-d option) is put into a dictionary and passed to the json argument of requests.post() API.

  • ‘statement’: value is full SQL statement in string

  • ‘resultset_target’ (optional): only need when there is no ‘INTO statement’ in the query string, and the value must be the COS URL output

sqlData = {'statement': sql_stmt}
request_headers = {'Content-Type': 'application/json'}
request_headers.update({'Accept':'application/json'})
request_headers.update({'User-Agent': self.user_agent})
request_headers.update({'authorization': 'Bearer {}'.format(ro_credentials.token)})
response = requests.post(
    "https://api.sql-query.cloud.ibm.com/v2/sql_jobs?instance_crn={}".format(self.instance_crn),
    headers=request_headers,
    json=sqlData)
"""
{
    "errors": [
        {
        "code": "bad_request",
        "message": "Target url specified in parameter resultset_target and as part of into clause in statement"
        }
    ]
}
{
    "job_id": "e2adca0a-9247-4cfa-ac58-db4b2bc33a01",
    "status": "queued"
}
{
    "status_code": 429,
    "errors": [
        {
        "code": "too_many_requests",
        "message": "This instance is currently running its maximum number of query jobs. Try again later, after at least one of the currently running jobs has completed."
        }
    ]
}
"""
# error code information: https://cloud.ibm.com/apidocs/sql-query
ibmcloudsql.sql_query_ts.test_run()[source]

Utilities module

class ibmcloudsql.utilities.IBMCloudAccess(cloud_apikey='', client_info='', staging=False, thread_safe=False, session=None, iam_max_tries: int = 1)[source]

Bases: object

This class provides APIs to get credentials to interact with IBM Cloud services, e.g. COS, SQL Query

Parameters:
cloud_apikeystr, optional

an account-level API key [manage/Access (IAM)/IBM Cloud API keys]

client_infostr, optional

a description

thread_safe: bool, optional

a new Session object is created if not provided

session: ibm_boto3.session.Session, optional

provide a Session object so that it can be reused

staging: bool, optional

if True, then uses the test IAM endpoint

iam_max_tries: int, optional

Number of tries to connect to IAM service

Attributes:
cos_session

Get the current COS session.

thread_safe

Check if SQLQuery is in thread-safe mode.

Methods

configure([cloud_apikey])

Update Cloud API key

logon([force])

Establish a connection to IBM Cloud.

get_session

configure(cloud_apikey=None)[source]

Update Cloud API key

property cos_session

Get the current COS session.

get_session()[source]
logon(force=False)[source]

Establish a connection to IBM Cloud.

Raises:
AttributeError:

The exception is raised when the token cannot be retrieved using the current credential.

Notes

An AIM token is needed for any operations to IBM cloud services (e.g. COS) A new AIM token is created after 300 seconds. A token is valid for 3600 seconds

property thread_safe

Check if SQLQuery is in thread-safe mode.

ibmcloudsql.utilities.confirm_action(action_name=None)[source]

Ask user to enter Y or N (case-insensitive). :return: True if the answer is Y. :rtype: bool

ibmcloudsql.utilities.rename_keys(d, keys)[source]

Rename keys from d that are present as a key in keys by the corresponding value in keys.

Parameters:
  • {dict} -- [a dict whose certain keys need to be updated] (d) –

  • {dict} -- [a dict that map old key to new key] (keys) –

Returns:

[dict] – [an updated dict]

ibmcloudsql.utilities.static_vars(**kwargs)[source]

use this as a decorator to a function to assign default value to static variable, e.g. var_stat

@static_vars(var_stat=0)
def func():
    func.var_stat = func.var_stat + 1

IMPORTANT: Inside the function, access to the variable should be using the function name, e.g. func.var_stat

Module contents