mirror of
https://github.com/dtomlinson91/panaetius.git
synced 2025-12-22 13:05:45 +00:00
adding latest
This commit is contained in:
6
old_src/panaetius/__init__.py
Normal file
6
old_src/panaetius/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from panaetius.config_inst import CONFIG
|
||||
from .config import Config
|
||||
from .library import set_config
|
||||
from panaetius.header import __header__
|
||||
import panaetius.logging
|
||||
from panaetius.logging import logger as logger
|
||||
1
old_src/panaetius/__version__.py
Normal file
1
old_src/panaetius/__version__.py
Normal file
@@ -0,0 +1 @@
|
||||
__version__ = '1.0.2'
|
||||
178
old_src/panaetius/config.py
Normal file
178
old_src/panaetius/config.py
Normal file
@@ -0,0 +1,178 @@
|
||||
from typing import Callable, Union
|
||||
import os
|
||||
import toml
|
||||
|
||||
from panaetius.library import export
|
||||
from panaetius.header import __header__
|
||||
from panaetius.db import Mask
|
||||
|
||||
# __all__ = ['Config']
|
||||
|
||||
|
||||
class Config:
|
||||
|
||||
"""Handles the config options for the module and stores config variables
|
||||
to be shared.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
config_file : dict
|
||||
Contains the config options. See
|
||||
:meth:`~panaetius.config.Config.read_config`
|
||||
for the data structure.
|
||||
deferred_messages : list
|
||||
A list containing the messages to be logged once the logger has been
|
||||
instantiated.
|
||||
Mask : panaetius.db.Mask
|
||||
Class to mask values in a config file.
|
||||
module_name : str
|
||||
A string representing the module name. This is added in front of all
|
||||
envrionment variables and is the title of the `config.toml`.
|
||||
path : str
|
||||
Path to config file
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : str
|
||||
Path to config file
|
||||
"""
|
||||
|
||||
def __init__(self, path: str, header: str = __header__) -> None:
|
||||
"""
|
||||
See :class:`~panaetius.config.Config` for parameters.
|
||||
"""
|
||||
self.path = os.path.expanduser(path)
|
||||
self.header = header
|
||||
self.deferred_messages = []
|
||||
self.config_file = self.read_config(path)
|
||||
self.module_name = self.header.lower()
|
||||
self.Mask = Mask
|
||||
|
||||
def read_config(self, path: str, write: bool = False) -> Union[dict, None]:
|
||||
"""Reads the toml config file from `path` if it exists.
|
||||
|
||||
"""
|
||||
|
||||
path += 'config.toml' if path[-1] == '/' else '/config.toml'
|
||||
path = os.path.expanduser(path)
|
||||
if not write:
|
||||
try:
|
||||
with open(path, 'r+') as config_file:
|
||||
config_file = toml.load(config_file)
|
||||
self.defer_log(f'Config file found at {path}')
|
||||
return config_file
|
||||
except FileNotFoundError:
|
||||
self.defer_log(f'Config file not found at {path}')
|
||||
else:
|
||||
try:
|
||||
with open(path, 'w+') as config_file:
|
||||
config_file = toml.load(config_file)
|
||||
self.defer_log(f'Config file found at {path}')
|
||||
return config_file
|
||||
except FileNotFoundError:
|
||||
self.defer_log(f'Config file not found at {path}')
|
||||
|
||||
def get(
|
||||
self,
|
||||
key: str,
|
||||
default: str = None,
|
||||
cast: Callable = None,
|
||||
mask: bool = False,
|
||||
) -> Union[str, None]:
|
||||
"""Retrives the config variable from either the `config.toml` or an
|
||||
environment variable. Will default to the default value if nothing
|
||||
is found
|
||||
|
||||
Parameters
|
||||
----------
|
||||
key : str
|
||||
Key to the configuration variable. Should be in the form
|
||||
`panaetius.variable` or `panaetius.header.variable`.
|
||||
When loaded, it will be accessable at
|
||||
`Config.panaetius_variable` or
|
||||
`Config.panaetius_header_variable`.
|
||||
default : str, optional
|
||||
The default value if nothing is found. Defaults to `None`.
|
||||
cast : Callable, optional
|
||||
The type of the variable. E.g `int` or `float`. Should reference
|
||||
the type object and not as string. Defaults to `None`.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Any
|
||||
Will return the config variable if found, or the default.
|
||||
"""
|
||||
env_key = f"{self.header.upper()}_{key.upper().replace('.', '_')}"
|
||||
|
||||
try:
|
||||
# look in the config.toml
|
||||
if len(key.split('.')) == 2:
|
||||
# look for subsections
|
||||
# print(mask)
|
||||
if mask:
|
||||
# print('mask', key)
|
||||
value = self.Mask(
|
||||
self.path, self.config_file, key
|
||||
).get_value()
|
||||
else:
|
||||
# print('no-mask')
|
||||
section, name = key.lower().split('.')
|
||||
value = self.config_file[self.module_name][section][name]
|
||||
self.defer_log(f'{env_key} found in config.toml')
|
||||
else:
|
||||
# print('valueerror')
|
||||
# look under top level module self.header
|
||||
# key = f'{self.module_name}.key'
|
||||
if mask:
|
||||
# key = f'{self.header}.{key}'
|
||||
# print(f'mask key={key}')
|
||||
value = self.Mask(
|
||||
self.path, self.config_file, key
|
||||
).get_value()
|
||||
else:
|
||||
name = key.lower()
|
||||
value = self.config_file[self.module_name][name]
|
||||
self.defer_log(f'{env_key} found in config.toml')
|
||||
# finally:
|
||||
try:
|
||||
# return if found in config.toml
|
||||
return cast(value) if cast else value
|
||||
except UnboundLocalError:
|
||||
# pass if nothing was found
|
||||
# print('unbound error')
|
||||
pass
|
||||
except KeyError:
|
||||
# print('key error')
|
||||
self.defer_log(f'{env_key} not found in config.toml')
|
||||
except TypeError:
|
||||
# print('type error')
|
||||
self.defer_log(f'{env_key} not found in config.toml')
|
||||
|
||||
# look for an environment variable
|
||||
value = os.environ.get(env_key.replace("-", "_"))
|
||||
|
||||
if value is not None:
|
||||
self.defer_log(f'{env_key} found in an environment variable')
|
||||
else:
|
||||
# fall back to default
|
||||
self.defer_log(f'{env_key} not found in an environment variable.')
|
||||
value = default
|
||||
self.defer_log(f'{env_key} set to default {default}')
|
||||
return cast(value) if cast else value
|
||||
|
||||
def defer_log(self, msg: str) -> None:
|
||||
"""Populates a list `Config.deferred_messages` with all the events to
|
||||
be passed to the logger later if required.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
msg : str
|
||||
The message to be logged.
|
||||
"""
|
||||
self.deferred_messages.append(msg)
|
||||
|
||||
def reset_log(self) -> None:
|
||||
"""Empties the list `Config.deferred_messages`.
|
||||
"""
|
||||
del self.deferred_messages
|
||||
self.deferred_messages = []
|
||||
9
old_src/panaetius/config_inst.py
Normal file
9
old_src/panaetius/config_inst.py
Normal file
@@ -0,0 +1,9 @@
|
||||
import os
|
||||
|
||||
from panaetius.header import __header__
|
||||
from panaetius.config import Config
|
||||
|
||||
|
||||
DEFAULT_CONFIG_PATH = f"~/.config/{__header__.lower()}"
|
||||
CONFIG_PATH = os.environ.get(f"{__header__.upper()}_CONFIG_PATH", DEFAULT_CONFIG_PATH)
|
||||
CONFIG = Config(CONFIG_PATH)
|
||||
256
old_src/panaetius/db.py
Normal file
256
old_src/panaetius/db.py
Normal file
@@ -0,0 +1,256 @@
|
||||
from os import path, urandom
|
||||
import hashlib
|
||||
from typing import Tuple
|
||||
import toml
|
||||
import io
|
||||
|
||||
from pylite.simplite import Pylite
|
||||
|
||||
from panaetius.header import __header__ as __header__
|
||||
import panaetius
|
||||
|
||||
|
||||
class Mask:
|
||||
|
||||
"""Class to handle masking sensitive values in a config file
|
||||
|
||||
Attributes
|
||||
----------
|
||||
config_contents : dict
|
||||
A dict containing the contents of the config file.
|
||||
config_path : str
|
||||
The path to the config file.
|
||||
config_var : str
|
||||
The key corresponding to the config entry.
|
||||
database : Pylite
|
||||
A Pylite instance for the datbase.
|
||||
entry : str
|
||||
The result from the config file. Could either be a hash or the raw
|
||||
value.
|
||||
header : str
|
||||
The __header__ which denotes where the config file is stored.
|
||||
name : str
|
||||
The key of the entry in the config file.
|
||||
result : str
|
||||
The value of the entry in the config file.
|
||||
table_name : str
|
||||
The sqlite table name. Defaults to the __header__ value.
|
||||
"""
|
||||
|
||||
@property
|
||||
def hash(self):
|
||||
"""Property to determine the hash of a config entry.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bytes
|
||||
The hash as a bytes object.
|
||||
"""
|
||||
try:
|
||||
if not self._hash_exists:
|
||||
pass
|
||||
except AttributeError:
|
||||
self._hash = hashlib.pbkdf2_hmac(
|
||||
'sha256',
|
||||
self.entry[self.name].encode('utf-8'),
|
||||
self.salt,
|
||||
100000,
|
||||
dklen=12,
|
||||
)
|
||||
self._hash_exists = True
|
||||
finally:
|
||||
return self._hash
|
||||
|
||||
@property
|
||||
def salt(self):
|
||||
"""Property to detemine a random salt to use in creation of the hash.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bytes
|
||||
The salt as a bytes object.
|
||||
"""
|
||||
self._salt = urandom(32)
|
||||
return self._salt
|
||||
|
||||
@staticmethod
|
||||
def as_string(obj: bytes) -> str:
|
||||
"""Static method to return a string from a bytes object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
obj : bytes
|
||||
Bytes object to be converted to a string.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
The bytes object as a string.
|
||||
"""
|
||||
return bytes.hex(obj)
|
||||
|
||||
@staticmethod
|
||||
def fromhex(obj: str) -> bytes:
|
||||
"""Static method to create a bytes object from a string.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
obj : str
|
||||
String object to be converted to bytes.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bytes
|
||||
The string object as bytes.
|
||||
"""
|
||||
return bytes.fromhex(obj)
|
||||
|
||||
@staticmethod
|
||||
def _from_key(config_var) -> Tuple[str, str]:
|
||||
try:
|
||||
header, name = config_var.split('.')
|
||||
except ValueError:
|
||||
header = ''
|
||||
name = config_var
|
||||
return (header, name)
|
||||
|
||||
def __init__(
|
||||
self, config_path: str, config_contents: dict, config_var: str
|
||||
):
|
||||
"""Summary
|
||||
See :class:`~Mask` for parameters.
|
||||
"""
|
||||
self.table: str = __header__
|
||||
self.config_path = config_path
|
||||
self.config_contents = config_contents
|
||||
self.config_var = config_var.replace('.', '_')
|
||||
self.header = self._from_key(config_var)[0]
|
||||
self.name = self._from_key(config_var)[1]
|
||||
try:
|
||||
# If value is under a subsection
|
||||
self.entry = self.config_contents[self.table][self.header]
|
||||
except KeyError:
|
||||
# If value is under the main header
|
||||
self.entry = self.config_contents[self.table]
|
||||
|
||||
def _get_database_file(self):
|
||||
self.database = self.config_path
|
||||
self.database += (
|
||||
f'.{self.table}.db'
|
||||
if self.config_path[-1] == '/'
|
||||
else f'/.{self.table}.db'
|
||||
)
|
||||
self.database = path.expanduser(self.database)
|
||||
return self
|
||||
|
||||
def _open_database(self):
|
||||
self.database = Pylite(self.database)
|
||||
|
||||
def _get_table(self):
|
||||
tables = [i[0] for i in self.database.get_tables()]
|
||||
if self.table not in tables:
|
||||
# panaetius.logger.debug(
|
||||
# 'Table not present in the database;'
|
||||
# f'creating the table {self.table} now'
|
||||
# )
|
||||
self.database.add_table(
|
||||
f'{self.table}',
|
||||
Name='text',
|
||||
Hash='text',
|
||||
Salt='text',
|
||||
Value='text',
|
||||
)
|
||||
else:
|
||||
# panaetius.logger.debug('Table already exists in the database')
|
||||
pass
|
||||
self.table_name = self.table
|
||||
|
||||
def _check_entries(self):
|
||||
var = self.database.get_items(self.table, f'Name="{self.config_var}"')
|
||||
if len(var) == 0:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def _insert_entries(self):
|
||||
self.database.insert(
|
||||
self.table,
|
||||
self.config_var,
|
||||
self.as_string(self.hash),
|
||||
self.as_string(self.salt),
|
||||
self.entry[self.name],
|
||||
)
|
||||
|
||||
def _update_entries_in_db(self):
|
||||
self.database.remove(self.table, f'Name="{self.config_var}"')
|
||||
self._insert_entries()
|
||||
|
||||
def _run_query(self, query: str):
|
||||
cur = self.database.db.cursor()
|
||||
cur.execute(query)
|
||||
self.database.db.commit()
|
||||
self.result = cur.fetchall()
|
||||
return self
|
||||
|
||||
def _get_all_items(self, where_clause: str = None):
|
||||
if where_clause is not None:
|
||||
self.result = self.database.get_items(self.table, where_clause)
|
||||
else:
|
||||
self.result = self.database.get_items(self.table)
|
||||
return self
|
||||
|
||||
def _process(self):
|
||||
if not self._check_entries():
|
||||
# panaetius.logger.debug('does not exist')
|
||||
self._insert_entries()
|
||||
self._update_entries_in_config()
|
||||
self._get_all_items()
|
||||
# panaetius.logger.debug(f'returning: {self.result[0][3]}')
|
||||
return self.entry[self.name]
|
||||
else:
|
||||
self._get_all_items(f'Name="{self.config_var}"')
|
||||
if self.result[0][1] == self.entry[self.name]:
|
||||
# panaetius.logger.debug('exists and hash matches')
|
||||
# panaetius.logger.debug(f'returning: {self.result[0][3]}')
|
||||
return self.result
|
||||
else:
|
||||
# panaetius.logger.debug('exists and hash doesnt match')
|
||||
# panaetius.logger.debug(
|
||||
# f'file_hash={self.entry[self.name]}, {self.result[0][1]}'
|
||||
# )
|
||||
self._update_entries_in_db()
|
||||
self._update_entries_in_config()
|
||||
self._get_all_items(f'Name="{self.config_var}"')
|
||||
# panaetius.logger.debug(f'returning: {self.result[0][3]}')
|
||||
return self.entry[self.name]
|
||||
|
||||
def _open_config_file(self) -> io.TextIOWrapper:
|
||||
self.config_path += (
|
||||
'/config.toml' if self.config_path[-1] != '/' else 'config.toml'
|
||||
)
|
||||
c = open(path.expanduser(self.config_path), 'w')
|
||||
return c
|
||||
|
||||
def _update_entries_in_config(self):
|
||||
self.entry.update({self.name: self.as_string(self.hash)})
|
||||
# panaetius.logger.debug(self.config_contents)
|
||||
# panaetius.logger.debug(self.entry)
|
||||
c = self._open_config_file()
|
||||
toml.dump(self.config_contents, c)
|
||||
c.close()
|
||||
|
||||
def get_value(self):
|
||||
"""Get the true value from the database if it exists, create if it'
|
||||
' doesn't exist or update if the hash has changed.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
The result from the database.
|
||||
"""
|
||||
# print(f'key in db {self.config_var}')
|
||||
self._get_database_file()
|
||||
self._open_database()
|
||||
self._get_table()
|
||||
self._process()
|
||||
return self.result[0][3]
|
||||
26
old_src/panaetius/header.py
Normal file
26
old_src/panaetius/header.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import os
|
||||
from importlib import util
|
||||
|
||||
__path = os.getcwd()
|
||||
|
||||
try:
|
||||
__spec = util.spec_from_file_location(
|
||||
'__header__', f'{os.getcwd()}/__header__.py'
|
||||
)
|
||||
__header__ = util.module_from_spec(__spec)
|
||||
__spec.loader.exec_module(__header__)
|
||||
__header__ = __header__.__header__
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
venv = os.environ.get('VIRTUAL_ENV').split('/')[-1]
|
||||
__header__ = venv
|
||||
except AttributeError:
|
||||
print(
|
||||
f'Cannot find a __header__.py file in {os.getcwd()} containing the'
|
||||
' __header__ value of your project name and you are not working'
|
||||
' from a virtual environment. Either make sure this file '
|
||||
'exists and the value is set or create and work from a virtual '
|
||||
'environment and try again. \n The __header__ value has been '
|
||||
'set to the default of panaetius.'
|
||||
)
|
||||
__header__ = 'panaetius'
|
||||
112
old_src/panaetius/library.py
Normal file
112
old_src/panaetius/library.py
Normal file
@@ -0,0 +1,112 @@
|
||||
from __future__ import annotations
|
||||
import sys
|
||||
from typing import Any, TypeVar, Type, TYPE_CHECKING, Union, List
|
||||
import ast
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import logging
|
||||
|
||||
|
||||
config_inst_t = TypeVar('config_inst_t', bound='panaetius.config.Config')
|
||||
|
||||
|
||||
def export(fn: callable) -> callable:
|
||||
mod = sys.modules[fn.__module__]
|
||||
if hasattr(mod, '__all__'):
|
||||
mod.__all__.append(fn.__name__)
|
||||
else:
|
||||
mod.__all__ = [fn.__name__]
|
||||
return fn
|
||||
|
||||
|
||||
def set_config(
|
||||
config_inst: Type[config_inst_t],
|
||||
key: str,
|
||||
default: str = None,
|
||||
cast: Any = None,
|
||||
check: Union[None, List] = None,
|
||||
mask: bool = False,
|
||||
) -> None:
|
||||
"""Sets the config variable on the instance of a class.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
config_inst : Type[config_inst_t]
|
||||
Instance of the :class:`~panaetius.config.Config` class.
|
||||
key : str
|
||||
The key referencing the config variable.
|
||||
default : str, optional
|
||||
The default value.
|
||||
mask : bool, optional
|
||||
Boolean to indiciate if a value in the `config.toml` should be masked.
|
||||
If this is set to True then the first time the variable is read from
|
||||
the config file the value will be replaced with a hash. Any time that
|
||||
value is then read the hash will be compared to the one stored and if
|
||||
they match the true value will be returned. This is stored in a sqlite
|
||||
`.db` next to the config file and is hidden by default. If the hash
|
||||
provided doesn't match the default behaviour is to update the `.db`
|
||||
with the new value and hash the value again. If you delete the
|
||||
database file then you will need to set the value again in the
|
||||
`config.toml`.
|
||||
cast : Any, optional
|
||||
The type of the variable.
|
||||
check : Union[None, List], optional
|
||||
Type of object to check against. This is useful if you want to use TOML
|
||||
to define a list, but want to make sure that a string representation
|
||||
of a list will be loaded properly if it set as an environment variable.
|
||||
|
||||
Example:
|
||||
|
||||
*config.toml* has the following attribute set::
|
||||
|
||||
[package.users]
|
||||
auth = ['user1', 'user2']
|
||||
|
||||
If set as an environment variable you can pass this list as a string
|
||||
and set :code:`check=list`::
|
||||
|
||||
Environment variable:
|
||||
PACKAGE_USERS_AUTH = "['user1', 'user2']"
|
||||
|
||||
Usage in code::
|
||||
|
||||
set_config(CONFIG, 'users.auth', check=list)
|
||||
"""
|
||||
config_var = key.lower().replace('.', '_')
|
||||
if check is None:
|
||||
setattr(
|
||||
config_inst, config_var, config_inst.get(key, default, cast, mask)
|
||||
)
|
||||
else:
|
||||
if type(config_inst.get(key, default, cast, mask)) is not check:
|
||||
if check is list:
|
||||
var = ast.literal_eval(
|
||||
config_inst.get(key, default, cast, mask)
|
||||
)
|
||||
setattr(config_inst, config_var, var)
|
||||
else:
|
||||
setattr(
|
||||
config_inst,
|
||||
config_var,
|
||||
config_inst.get(key, default, cast, mask),
|
||||
)
|
||||
|
||||
|
||||
# Create function to print cached logged messages and reset
|
||||
def process_cached_logs(
|
||||
config_inst: Type[config_inst_t], logger: logging.Logger
|
||||
):
|
||||
"""Prints the cached messages from :class:`~panaetius.config.Config`
|
||||
and resets the cache.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
config_inst : Type[config_inst_t]
|
||||
Instance of :class:`~panaetius.config.Config`.
|
||||
logger : logging.Logger
|
||||
Instance of the logger.
|
||||
"""
|
||||
for msg in config_inst.deferred_messages:
|
||||
logger.info(msg)
|
||||
config_inst.reset_log()
|
||||
54
old_src/panaetius/logging.py
Normal file
54
old_src/panaetius/logging.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
import os
|
||||
import sys
|
||||
|
||||
import panaetius
|
||||
from panaetius import CONFIG as CONFIG
|
||||
from panaetius import __header__ as __header__
|
||||
from panaetius import set_config as set_config
|
||||
|
||||
|
||||
panaetius.set_config(CONFIG, 'logging.path')
|
||||
panaetius.set_config(
|
||||
CONFIG,
|
||||
'logging.format',
|
||||
'{\n\t"time": "%(asctime)s",\n\t"file_name": "%(filename)s",'
|
||||
'\n\t"module": "%(module)s",\n\t"function":"%(funcName)s",\n\t'
|
||||
'"line_number": "%(lineno)s",\n\t"logging_level":'
|
||||
'"%(levelname)s",\n\t"message": "%(message)s"\n}',
|
||||
cast=str,
|
||||
)
|
||||
set_config(CONFIG, 'logging.level', 'INFO')
|
||||
|
||||
# Logging Configuration
|
||||
logger = logging.getLogger(__header__)
|
||||
loghandler_sys = logging.StreamHandler(sys.stdout)
|
||||
|
||||
# Checking if log path is set
|
||||
if CONFIG.logging_path:
|
||||
CONFIG.logging_path += (
|
||||
f'{__header__}.log'
|
||||
if CONFIG.logging_path[-1] == '/'
|
||||
else f'/{__header__}.log'
|
||||
)
|
||||
# Set default log file options
|
||||
set_config(CONFIG, 'logging.backup_count', 3, int)
|
||||
set_config(CONFIG, 'logging.rotate_bytes', 512000, int)
|
||||
|
||||
# Configure file handler
|
||||
loghandler_file = RotatingFileHandler(
|
||||
os.path.expanduser(CONFIG.logging_path),
|
||||
'a',
|
||||
CONFIG.logging_rotate_bytes,
|
||||
CONFIG.logging_backup_count,
|
||||
)
|
||||
|
||||
# Add to file formatter
|
||||
loghandler_file.setFormatter(logging.Formatter(CONFIG.logging_format))
|
||||
logger.addHandler(loghandler_file)
|
||||
|
||||
# Configure and add to stdout formatter
|
||||
loghandler_sys.setFormatter(logging.Formatter(CONFIG.logging_format))
|
||||
logger.addHandler(loghandler_sys)
|
||||
logger.setLevel(CONFIG.logging_level)
|
||||
Reference in New Issue
Block a user