removing old source code

This commit is contained in:
2021-10-18 02:32:15 +01:00
parent 78b86967e7
commit 31fe9b1afc
8 changed files with 0 additions and 642 deletions

View File

@@ -1,6 +0,0 @@
from panaetius.config_inst import CONFIG
from .config import Config
from .library import set_config
from panaetius.header import __header__
import panaetius.logging
from panaetius.logging import logger as logger

View File

@@ -1 +0,0 @@
__version__ = '1.0.2'

View File

@@ -1,178 +0,0 @@
from typing import Callable, Union
import os
import toml
from panaetius.library import export
from panaetius.header import __header__
from panaetius.db import Mask
# __all__ = ['Config']
class Config:
"""Handles the config options for the module and stores config variables
to be shared.
Attributes
----------
config_file : dict
Contains the config options. See
:meth:`~panaetius.config.Config.read_config`
for the data structure.
deferred_messages : list
A list containing the messages to be logged once the logger has been
instantiated.
Mask : panaetius.db.Mask
Class to mask values in a config file.
module_name : str
A string representing the module name. This is added in front of all
envrionment variables and is the title of the `config.toml`.
path : str
Path to config file
Parameters
----------
path : str
Path to config file
"""
def __init__(self, path: str, header: str = __header__) -> None:
"""
See :class:`~panaetius.config.Config` for parameters.
"""
self.path = os.path.expanduser(path)
self.header = header
self.deferred_messages = []
self.config_file = self.read_config(path)
self.module_name = self.header.lower()
self.Mask = Mask
def read_config(self, path: str, write: bool = False) -> Union[dict, None]:
"""Reads the toml config file from `path` if it exists.
"""
path += 'config.toml' if path[-1] == '/' else '/config.toml'
path = os.path.expanduser(path)
if not write:
try:
with open(path, 'r+') as config_file:
config_file = toml.load(config_file)
self.defer_log(f'Config file found at {path}')
return config_file
except FileNotFoundError:
self.defer_log(f'Config file not found at {path}')
else:
try:
with open(path, 'w+') as config_file:
config_file = toml.load(config_file)
self.defer_log(f'Config file found at {path}')
return config_file
except FileNotFoundError:
self.defer_log(f'Config file not found at {path}')
def get(
self,
key: str,
default: str = None,
cast: Callable = None,
mask: bool = False,
) -> Union[str, None]:
"""Retrives the config variable from either the `config.toml` or an
environment variable. Will default to the default value if nothing
is found
Parameters
----------
key : str
Key to the configuration variable. Should be in the form
`panaetius.variable` or `panaetius.header.variable`.
When loaded, it will be accessable at
`Config.panaetius_variable` or
`Config.panaetius_header_variable`.
default : str, optional
The default value if nothing is found. Defaults to `None`.
cast : Callable, optional
The type of the variable. E.g `int` or `float`. Should reference
the type object and not as string. Defaults to `None`.
Returns
-------
Any
Will return the config variable if found, or the default.
"""
env_key = f"{self.header.upper()}_{key.upper().replace('.', '_')}"
try:
# look in the config.toml
if len(key.split('.')) == 2:
# look for subsections
# print(mask)
if mask:
# print('mask', key)
value = self.Mask(
self.path, self.config_file, key
).get_value()
else:
# print('no-mask')
section, name = key.lower().split('.')
value = self.config_file[self.module_name][section][name]
self.defer_log(f'{env_key} found in config.toml')
else:
# print('valueerror')
# look under top level module self.header
# key = f'{self.module_name}.key'
if mask:
# key = f'{self.header}.{key}'
# print(f'mask key={key}')
value = self.Mask(
self.path, self.config_file, key
).get_value()
else:
name = key.lower()
value = self.config_file[self.module_name][name]
self.defer_log(f'{env_key} found in config.toml')
# finally:
try:
# return if found in config.toml
return cast(value) if cast else value
except UnboundLocalError:
# pass if nothing was found
# print('unbound error')
pass
except KeyError:
# print('key error')
self.defer_log(f'{env_key} not found in config.toml')
except TypeError:
# print('type error')
self.defer_log(f'{env_key} not found in config.toml')
# look for an environment variable
value = os.environ.get(env_key.replace("-", "_"))
if value is not None:
self.defer_log(f'{env_key} found in an environment variable')
else:
# fall back to default
self.defer_log(f'{env_key} not found in an environment variable.')
value = default
self.defer_log(f'{env_key} set to default {default}')
return cast(value) if cast else value
def defer_log(self, msg: str) -> None:
"""Populates a list `Config.deferred_messages` with all the events to
be passed to the logger later if required.
Parameters
----------
msg : str
The message to be logged.
"""
self.deferred_messages.append(msg)
def reset_log(self) -> None:
"""Empties the list `Config.deferred_messages`.
"""
del self.deferred_messages
self.deferred_messages = []

View File

@@ -1,9 +0,0 @@
import os
from panaetius.header import __header__
from panaetius.config import Config
DEFAULT_CONFIG_PATH = f"~/.config/{__header__.lower()}"
CONFIG_PATH = os.environ.get(f"{__header__.upper()}_CONFIG_PATH", DEFAULT_CONFIG_PATH)
CONFIG = Config(CONFIG_PATH)

View File

@@ -1,256 +0,0 @@
from os import path, urandom
import hashlib
from typing import Tuple
import toml
import io
from pylite.simplite import Pylite
from panaetius.header import __header__ as __header__
import panaetius
class Mask:
"""Class to handle masking sensitive values in a config file
Attributes
----------
config_contents : dict
A dict containing the contents of the config file.
config_path : str
The path to the config file.
config_var : str
The key corresponding to the config entry.
database : Pylite
A Pylite instance for the datbase.
entry : str
The result from the config file. Could either be a hash or the raw
value.
header : str
The __header__ which denotes where the config file is stored.
name : str
The key of the entry in the config file.
result : str
The value of the entry in the config file.
table_name : str
The sqlite table name. Defaults to the __header__ value.
"""
@property
def hash(self):
"""Property to determine the hash of a config entry.
Returns
-------
bytes
The hash as a bytes object.
"""
try:
if not self._hash_exists:
pass
except AttributeError:
self._hash = hashlib.pbkdf2_hmac(
'sha256',
self.entry[self.name].encode('utf-8'),
self.salt,
100000,
dklen=12,
)
self._hash_exists = True
finally:
return self._hash
@property
def salt(self):
"""Property to detemine a random salt to use in creation of the hash.
Returns
-------
bytes
The salt as a bytes object.
"""
self._salt = urandom(32)
return self._salt
@staticmethod
def as_string(obj: bytes) -> str:
"""Static method to return a string from a bytes object.
Parameters
----------
obj : bytes
Bytes object to be converted to a string.
Returns
-------
str
The bytes object as a string.
"""
return bytes.hex(obj)
@staticmethod
def fromhex(obj: str) -> bytes:
"""Static method to create a bytes object from a string.
Parameters
----------
obj : str
String object to be converted to bytes.
Returns
-------
bytes
The string object as bytes.
"""
return bytes.fromhex(obj)
@staticmethod
def _from_key(config_var) -> Tuple[str, str]:
try:
header, name = config_var.split('.')
except ValueError:
header = ''
name = config_var
return (header, name)
def __init__(
self, config_path: str, config_contents: dict, config_var: str
):
"""Summary
See :class:`~Mask` for parameters.
"""
self.table: str = __header__
self.config_path = config_path
self.config_contents = config_contents
self.config_var = config_var.replace('.', '_')
self.header = self._from_key(config_var)[0]
self.name = self._from_key(config_var)[1]
try:
# If value is under a subsection
self.entry = self.config_contents[self.table][self.header]
except KeyError:
# If value is under the main header
self.entry = self.config_contents[self.table]
def _get_database_file(self):
self.database = self.config_path
self.database += (
f'.{self.table}.db'
if self.config_path[-1] == '/'
else f'/.{self.table}.db'
)
self.database = path.expanduser(self.database)
return self
def _open_database(self):
self.database = Pylite(self.database)
def _get_table(self):
tables = [i[0] for i in self.database.get_tables()]
if self.table not in tables:
# panaetius.logger.debug(
# 'Table not present in the database;'
# f'creating the table {self.table} now'
# )
self.database.add_table(
f'{self.table}',
Name='text',
Hash='text',
Salt='text',
Value='text',
)
else:
# panaetius.logger.debug('Table already exists in the database')
pass
self.table_name = self.table
def _check_entries(self):
var = self.database.get_items(self.table, f'Name="{self.config_var}"')
if len(var) == 0:
return False
else:
return True
def _insert_entries(self):
self.database.insert(
self.table,
self.config_var,
self.as_string(self.hash),
self.as_string(self.salt),
self.entry[self.name],
)
def _update_entries_in_db(self):
self.database.remove(self.table, f'Name="{self.config_var}"')
self._insert_entries()
def _run_query(self, query: str):
cur = self.database.db.cursor()
cur.execute(query)
self.database.db.commit()
self.result = cur.fetchall()
return self
def _get_all_items(self, where_clause: str = None):
if where_clause is not None:
self.result = self.database.get_items(self.table, where_clause)
else:
self.result = self.database.get_items(self.table)
return self
def _process(self):
if not self._check_entries():
# panaetius.logger.debug('does not exist')
self._insert_entries()
self._update_entries_in_config()
self._get_all_items()
# panaetius.logger.debug(f'returning: {self.result[0][3]}')
return self.entry[self.name]
else:
self._get_all_items(f'Name="{self.config_var}"')
if self.result[0][1] == self.entry[self.name]:
# panaetius.logger.debug('exists and hash matches')
# panaetius.logger.debug(f'returning: {self.result[0][3]}')
return self.result
else:
# panaetius.logger.debug('exists and hash doesnt match')
# panaetius.logger.debug(
# f'file_hash={self.entry[self.name]}, {self.result[0][1]}'
# )
self._update_entries_in_db()
self._update_entries_in_config()
self._get_all_items(f'Name="{self.config_var}"')
# panaetius.logger.debug(f'returning: {self.result[0][3]}')
return self.entry[self.name]
def _open_config_file(self) -> io.TextIOWrapper:
self.config_path += (
'/config.toml' if self.config_path[-1] != '/' else 'config.toml'
)
c = open(path.expanduser(self.config_path), 'w')
return c
def _update_entries_in_config(self):
self.entry.update({self.name: self.as_string(self.hash)})
# panaetius.logger.debug(self.config_contents)
# panaetius.logger.debug(self.entry)
c = self._open_config_file()
toml.dump(self.config_contents, c)
c.close()
def get_value(self):
"""Get the true value from the database if it exists, create if it'
' doesn't exist or update if the hash has changed.
Returns
-------
str
The result from the database.
"""
# print(f'key in db {self.config_var}')
self._get_database_file()
self._open_database()
self._get_table()
self._process()
return self.result[0][3]

View File

@@ -1,26 +0,0 @@
import os
from importlib import util
__path = os.getcwd()
try:
__spec = util.spec_from_file_location(
'__header__', f'{os.getcwd()}/__header__.py'
)
__header__ = util.module_from_spec(__spec)
__spec.loader.exec_module(__header__)
__header__ = __header__.__header__
except FileNotFoundError:
try:
venv = os.environ.get('VIRTUAL_ENV').split('/')[-1]
__header__ = venv
except AttributeError:
print(
f'Cannot find a __header__.py file in {os.getcwd()} containing the'
' __header__ value of your project name and you are not working'
' from a virtual environment. Either make sure this file '
'exists and the value is set or create and work from a virtual '
'environment and try again. \n The __header__ value has been '
'set to the default of panaetius.'
)
__header__ = 'panaetius'

View File

@@ -1,112 +0,0 @@
from __future__ import annotations
import sys
from typing import Any, TypeVar, Type, TYPE_CHECKING, Union, List
import ast
if TYPE_CHECKING:
import logging
config_inst_t = TypeVar('config_inst_t', bound='panaetius.config.Config')
def export(fn: callable) -> callable:
mod = sys.modules[fn.__module__]
if hasattr(mod, '__all__'):
mod.__all__.append(fn.__name__)
else:
mod.__all__ = [fn.__name__]
return fn
def set_config(
config_inst: Type[config_inst_t],
key: str,
default: str = None,
cast: Any = None,
check: Union[None, List] = None,
mask: bool = False,
) -> None:
"""Sets the config variable on the instance of a class.
Parameters
----------
config_inst : Type[config_inst_t]
Instance of the :class:`~panaetius.config.Config` class.
key : str
The key referencing the config variable.
default : str, optional
The default value.
mask : bool, optional
Boolean to indiciate if a value in the `config.toml` should be masked.
If this is set to True then the first time the variable is read from
the config file the value will be replaced with a hash. Any time that
value is then read the hash will be compared to the one stored and if
they match the true value will be returned. This is stored in a sqlite
`.db` next to the config file and is hidden by default. If the hash
provided doesn't match the default behaviour is to update the `.db`
with the new value and hash the value again. If you delete the
database file then you will need to set the value again in the
`config.toml`.
cast : Any, optional
The type of the variable.
check : Union[None, List], optional
Type of object to check against. This is useful if you want to use TOML
to define a list, but want to make sure that a string representation
of a list will be loaded properly if it set as an environment variable.
Example:
*config.toml* has the following attribute set::
[package.users]
auth = ['user1', 'user2']
If set as an environment variable you can pass this list as a string
and set :code:`check=list`::
Environment variable:
PACKAGE_USERS_AUTH = "['user1', 'user2']"
Usage in code::
set_config(CONFIG, 'users.auth', check=list)
"""
config_var = key.lower().replace('.', '_')
if check is None:
setattr(
config_inst, config_var, config_inst.get(key, default, cast, mask)
)
else:
if type(config_inst.get(key, default, cast, mask)) is not check:
if check is list:
var = ast.literal_eval(
config_inst.get(key, default, cast, mask)
)
setattr(config_inst, config_var, var)
else:
setattr(
config_inst,
config_var,
config_inst.get(key, default, cast, mask),
)
# Create function to print cached logged messages and reset
def process_cached_logs(
config_inst: Type[config_inst_t], logger: logging.Logger
):
"""Prints the cached messages from :class:`~panaetius.config.Config`
and resets the cache.
Parameters
----------
config_inst : Type[config_inst_t]
Instance of :class:`~panaetius.config.Config`.
logger : logging.Logger
Instance of the logger.
"""
for msg in config_inst.deferred_messages:
logger.info(msg)
config_inst.reset_log()

View File

@@ -1,54 +0,0 @@
import logging
from logging.handlers import RotatingFileHandler
import os
import sys
import panaetius
from panaetius import CONFIG as CONFIG
from panaetius import __header__ as __header__
from panaetius import set_config as set_config
panaetius.set_config(CONFIG, 'logging.path')
panaetius.set_config(
CONFIG,
'logging.format',
'{\n\t"time": "%(asctime)s",\n\t"file_name": "%(filename)s",'
'\n\t"module": "%(module)s",\n\t"function":"%(funcName)s",\n\t'
'"line_number": "%(lineno)s",\n\t"logging_level":'
'"%(levelname)s",\n\t"message": "%(message)s"\n}',
cast=str,
)
set_config(CONFIG, 'logging.level', 'INFO')
# Logging Configuration
logger = logging.getLogger(__header__)
loghandler_sys = logging.StreamHandler(sys.stdout)
# Checking if log path is set
if CONFIG.logging_path:
CONFIG.logging_path += (
f'{__header__}.log'
if CONFIG.logging_path[-1] == '/'
else f'/{__header__}.log'
)
# Set default log file options
set_config(CONFIG, 'logging.backup_count', 3, int)
set_config(CONFIG, 'logging.rotate_bytes', 512000, int)
# Configure file handler
loghandler_file = RotatingFileHandler(
os.path.expanduser(CONFIG.logging_path),
'a',
CONFIG.logging_rotate_bytes,
CONFIG.logging_backup_count,
)
# Add to file formatter
loghandler_file.setFormatter(logging.Formatter(CONFIG.logging_format))
logger.addHandler(loghandler_file)
# Configure and add to stdout formatter
loghandler_sys.setFormatter(logging.Formatter(CONFIG.logging_format))
logger.addHandler(loghandler_sys)
logger.setLevel(CONFIG.logging_level)