Source code for apyt.io.sql

"""
The APyT SQL module
===================

This module provides a lightweight Python interface to the APyT SQL database. It
implements convenience functions for **downloading**, **querying**, and
**updating** measurement data stored in the central database.

The design goal of this module is to abstract away low-level HTTP/SQL details
and provide a consistent API for accessing experimental data in Python. In
particular:

* All functions can use explicit authorization credentials (username, password)
  if required for database access.
* Each function returns both the HTTP status code of the request and the
  retrieved or updated content (if applicable).
* Downloaded measurement data can optionally be cached on disk in NumPy `.npy`
  files to reduce repeated network requests.


Typical use cases
-----------------

- Fetching structured measurement data from the database for analysis.
- Updating metadata fields (e.g., experiment or evaluation parameters) of
  existing records.
- Executing SQL queries for specific columns and records.


List of functions
-----------------

* :func:`download`: Download and (optionally) cache measurement data from the
  database.
* :func:`dump_record`: Dump a single SQL database record to a JSON file.
* :func:`load_record`: Load a JSON record from file and upload it to the SQL
  database.
* :func:`query`: Query one or more fields from a SQL database record.
* :func:`update`: Update a specific field of a database record.


Implementation notes
--------------------

- Database access is performed via HTTP requests to PHP scripts
  (``download.php``, ``update.php``, ``query.php``) hosted at the configured
  database URL (see :func:`apyt.io.config.get_setting`).
- Binary measurement datasets are streamed and converted directly into NumPy
  arrays.
- Error handling and logging are integrated throughout; failed requests return
  the corresponding HTTP status code.


.. sectionauthor:: Sebastian M. Eich <Sebastian.Eich@imw.uni-stuttgart.de>
.. codeauthor::    Sebastian M. Eich <Sebastian.Eich@imw.uni-stuttgart.de>
"""
#
#
__version__ = "0.1.0"
__all__ = ["download", "dump_record", "load_record", "query", "update"]
#
#
# import modules
import fujson
import json
import logging
import numpy as np
import requests
import warnings
#
# import individual functions
from apyt.io.config import _RAW_FILE_DTYPE, get_setting
from datetime import datetime
from html2text import HTML2Text
from os.path import isfile
from pathlib import Path
from time import sleep
#
#
#
#
# set up logger
logger = logging.getLogger(__name__)
#
#
#
#
################################################################################
#
# public functions
#
################################################################################
[docs]def download(id, use_cache = False, auth = None): """ Download and (optionally) cache measurement data from the database. This function retrieves measurement data stored in the APyT SQL database. Data are returned as a structured NumPy array with predefined fields such as detector position, voltage signals, and timing information. To reduce repeated network requests, measurement data can be cached on disk in a local ``.npy`` file, identified by the database record's ``custom_id``. If caching is enabled, subsequent calls will load data directly from the cache instead of re-downloading. Parameters ---------- id : int The measurement ID of the record in the SQL database. use_cache : bool, optional If ``True``, attempt to load data from a local cache file. If the cache does not exist, data will be downloaded and then written to disk. If ``False``, data are always fetched from the database. The latter is the default behavior. auth : tuple of (str, str), optional A tuple ``(username, password)`` providing authorization credentials. If ``None``, access to the database may fail, depending on its access/security configuration. Returns ------- status : int HTTP status code returned by the database request. - ``200`` indicates success. - Other codes indicate failure. data : numpy.ndarray or None A structured NumPy array containing the measurement events with the following fields: - ``U_base`` : float32 — Base voltage - ``U_pulse`` : float32 — Pulse voltage - ``U_reflectron`` : float32 — Reflectron voltage - ``x_det`` : float32 — Detector *x*-position - ``y_det`` : float32 — Detector *y*-position - ``tof`` : float32 — Time-of-flight - ``epoch`` : int32 — Epoch time - ``pulse_num`` : uint32 — Pulse number Returns ``None`` if the download or query fails. Notes ----- - Creates or reads a cache file ``<custom_id>.npy`` in the current working directory when ``use_cache = True``. - Requires network access to the SQL database unless loading from cache. - Logs progress and errors using the module-level logger. """ # # # get custom ID from database status, record = query(id, "custom_id", auth = auth) if status != requests.codes.ok: logger.error(f"Failed to retrieve custom ID for record {id}.") return status, None custom_id = record['custom_id'] # # # set cache file name cache_file = f"{custom_id}.npy" # # # # # download data from database if use_cache == False or isfile(cache_file) == False: # download file from database logger.info( f"Downloading data for custom ID \"{custom_id}\" from database…" ) r = _request( get_setting("database.url") + "/download.php", {'id' : id}, auth ) if r.status_code != requests.codes.ok: logger.error( f"Download failed for record {id} (HTTP {r.status_code})." ) return r.status_code, None # # # copy buffer content to numpy array data = np.frombuffer(r.content, dtype = _RAW_FILE_DTYPE).copy() del r # # # get data from cache file if use_cache == True: # write cache file if it does not exist if isfile(cache_file) == False: logger.info(f"Writing data to cache file \"{cache_file}\".") with open(cache_file, 'wb') as f: np.save(f, data) # read cache file else: logger.info(f"Reading data from cache file \"{cache_file}\".") with open(cache_file, 'rb') as f: data = np.load(f) # # # # # return data logger.info(f"Data file contains {len(data)} events.") return requests.codes.ok, data
# # # #
[docs]def dump_record(id, file_name = None): """ Dump a single SQL database record to a JSON file. This function retrieves a measurement record from the SQL database and writes its content to a JSON file. If no output filename is provided, one will be generated automatically based on the record's ``custom_id`` and the current timestamp. Parameters ---------- id : int The measurement ID of the record to retrieve. file_name : str or Path, optional The name of the output file. If ``None``, the filename is constructed as ``<custom_id>_<YYYYMMDD_HHMMSS>.rec``. Returns ------- str or None The path to the created JSON file, or ``None`` if the query failed. Warns ----- UserWarning If the SQL query does not succeed. """ # # # retrieve database record logger.info(f"Retrieving record {id} from SQL database.") status, record = query(id, "*") if status != requests.codes.ok: warnings.warn( f"Failed to retrieve record {id} (status={status}).", UserWarning ) return None # # # generate output filename if not provided if file_name is None: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') file_name = f"{record['custom_id']}_{timestamp}.rec" # # # write JSON data file_path = Path(file_name) logger.info(f"Writing record {id} to \"{file_path}\".") with open(file_path, "w", encoding = "utf-8") as f: fujson.dump( record, f, ensure_ascii = False, float_format = ".9e", indent = 4 ) # # # return path to the created JSON file return str(file_path)
# # # #
[docs]def load_record(id, file_name, auth): """ Load a JSON record from file and upload it to the SQL database. This function reads a JSON file containing record data and updates the corresponding entry in the SQL database. Each key--value pair is uploaded individually via the :func:`update` function, except for read-only fields. Parameters ---------- id : int The measurement ID of the record in the SQL database. file_name : str or Path Path to the JSON file containing the record data. auth : tuple of (str, str) Authentication credentials (username, password) for the SQL database. Returns ------- bool ``True`` if all fields were uploaded successfully, ``False`` if the input file is missing or if any upload fails. Warns ----- UserWarning - If the input file does not exist. - If uploading any field fails. """ # # # check file existence file_path = Path(file_name) if not file_path.is_file(): warnings.warn(f"File \"{file_name}\" does not exist.", UserWarning) return False # # # read JSON data from file logger.info(f"Reading record from file \"{file_name}\".") with open(file_path, "r", encoding = "utf-8") as f: record = json.load(f) # # # upload parameters to SQL database ro_keys = ('id', 'last_modified', 'file', 'checksum', 'user') for key, value in record.items(): # skip read-only and empty fields if key in ro_keys or value == "": continue # # # update field status, response = update(id, key, value, auth = auth) if status != requests.codes.ok or response != "OK": warnings.warn( f"Failed to upload \"{key}\" for record {id} " f"(status={status}, response={response}).", UserWarning ) return False # # # throttle queries sleep(0.1) # # # return True on success return True
# # # #
[docs]def query(id, keys, auth = None): """ Query one or more fields from a SQL database record. This function retrieves specific fields of a measurement entry from the APyT SQL database. Results are returned as a dictionary mapping field names to values. Parameters ---------- id : int The measurement ID of the record in the SQL database. keys : str or iterable of str The field(s) to retrieve from the database entry. If a single string is provided, it is automatically converted to a tuple. ``"*"`` retrieves all fields from the database entry. auth : tuple of (str, str), optional A tuple ``(username, password)`` providing authorization credentials. If ``None``, access to the database may fail, depending on its access/security configuration. Returns ------- status : int HTTP status code returned by the database request. - ``200`` indicates success. - Other codes indicate failure (see ``requests.codes``). result : dict or None Dictionary containing the requested fields. Returns ``None`` if the request fails or the record is missing. Notes ----- - If the field ``'custom_id'`` is present, it is converted to ``str`` (in case of a numeric-only custom ID). - Errors are logged with the module-level logger. """ # # # build SQL query if keys == "*": sql = f"SELECT * FROM data WHERE id = {id}" elif isinstance(keys, str): sql = f"SELECT id, {keys} FROM data WHERE id = {id}" else: sql = f"SELECT id, {', '.join(keys)} FROM data WHERE id = {id}" payload = {"format": "json", "sql": sql} # # # get request response r = _request(get_setting("database.url") + "/query.php", payload, auth) if r.status_code != requests.codes.ok: logger.error(f"Query failed for record {id} (HTTP {r.status_code}).") return r.status_code, None # # # convert query results to dictionary try: record = r.json()[str(id)] except ValueError as e: logger.error(f"Failed to parse JSON response for record {id}: {e}") return r.status_code, None # # # ensure custom_id is always string if 'custom_id' in record: record['custom_id'] = str(record['custom_id']) # # # return query results dictionary return requests.codes.ok, record
# # # #
[docs]def update(id, key, value, auth = None, method = 'GET'): """ Update a specific field of a database record. This function modifies an existing entry in the APyT SQL database by updating a single key–value pair. The update request is sent via HTTP (either GET or POST) to the configured database endpoint. Parameters ---------- id : int The measurement ID of the record in the SQL database. key : str The field name to update in the database entry. value : str The new value for the specified field. auth : tuple of (str, str), optional A tuple ``(username, password)`` providing authorization credentials. If ``None``, access to the database may fail, depending on its access/security configuration. method : {'GET', 'POST'}, default 'GET' The HTTP request method to use when submitting the update. Returns ------- status : int HTTP status code returned by the database request. - ``200`` indicates success. - Other codes indicate failure. response : str or None The raw text returned by the database endpoint. Returns ``None`` if the request fails. Notes ----- - The database endpoint returns ``"OK"`` when the update is successful. - Errors returned by the database are converted to readable text and logged. """ # # # check for valid method if method not in ('GET', 'POST'): logger.error(f"Invalid method '{method}'. Must be 'GET' or 'POST'.") return requests.codes.bad_request, None # # # floats in "parameters" field shall be in scientific notation if key == "parameters": value = fujson.dumps(value, ensure_ascii = False, float_format = ".9e") # # # update database record (value needs to be quoted) logger.info(f"Updating \"{key}\" for record {id} in database… ") r = _request( get_setting("database.url") + "/update.php", {'id': id, 'key': key, 'value': f"'{value}'"}, auth, method ) if r.status_code != requests.codes.ok: logger.error( f"Update request failed for record {id} (HTTP {r.status_code})." ) return r.status_code, None # # # check server response response = r.text if response == "OK": logger.info(f"SQL update successful for record {id}.") else: logger.error( "SQL update failed. Server returned:\n\n" + HTML2Text().handle(response) ) # # # return response return requests.codes.ok, response
# # # # ################################################################################ # # private module-level functions # ################################################################################ def _request(request_url, payload, auth = None, method = "GET"): """ Internal helper: send an HTTP request to the database. Parameters ---------- request_url : str Full URL of the database endpoint. payload : dict Dictionary of request parameters or data. auth : tuple of (str, str), optional Authorization credentials. method : {'GET', 'POST'}, default 'GET' HTTP request method. Returns ------- requests.Response The HTTP response object. """ # # # get request response try: if method == "GET": r = requests.get( request_url, params = payload, auth = auth, timeout = 30 ) elif method == "POST": r = requests.post( request_url + "?method=POST", data = payload, auth = auth, timeout = 30 ) else: raise ValueError( f"Unsupported request method \"{method}\". " "Expected \"GET\" or \"POST\"." ) except requests.RequestException as e: logger.error(f"Request {method} {request_url} failed: {e}") raise # # # check request status code if r.status_code == requests.codes.ok: logger.debug(f"Request {method} {request_url} succeeded.") else: logger.error( f"Request {method} {request_url} failed (HTTP {r.status_code})." ) # # # return response return r