@ -20,6 +20,7 @@ import re
import shlex
import shlex
import sys
import sys
from decimal import Decimal
from decimal import Decimal
from pathlib import Path
from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile
from time import sleep
from time import sleep
@ -39,7 +40,7 @@ import colorama
from appdirs import user_config_dir
from appdirs import user_config_dir
# Disable logging until we can configure it how the user wants
# Disable logging until we can configure it how the user wants
logging . basicConfig ( filename = ' /dev/null ' )
logging . basicConfig ( stream = os . devnull )
# Log Level Representations
# Log Level Representations
EMOJI_LOGLEVELS = {
EMOJI_LOGLEVELS = {
@ -96,7 +97,6 @@ def format_ansi(text):
class ANSIFormatter ( logging . Formatter ) :
class ANSIFormatter ( logging . Formatter ) :
""" A log formatter that inserts ANSI color.
""" A log formatter that inserts ANSI color.
"""
"""
def format ( self , record ) :
def format ( self , record ) :
msg = super ( ANSIFormatter , self ) . format ( record )
msg = super ( ANSIFormatter , self ) . format ( record )
return format_ansi ( msg )
return format_ansi ( msg )
@ -105,7 +105,6 @@ class ANSIFormatter(logging.Formatter):
class ANSIEmojiLoglevelFormatter ( ANSIFormatter ) :
class ANSIEmojiLoglevelFormatter ( ANSIFormatter ) :
""" A log formatter that makes the loglevel an emoji on UTF capable terminals.
""" A log formatter that makes the loglevel an emoji on UTF capable terminals.
"""
"""
def format ( self , record ) :
def format ( self , record ) :
if UNICODE_SUPPORT :
if UNICODE_SUPPORT :
record . levelname = EMOJI_LOGLEVELS [ record . levelname ] . format ( * * ansi_colors )
record . levelname = EMOJI_LOGLEVELS [ record . levelname ] . format ( * * ansi_colors )
@ -115,7 +114,6 @@ class ANSIEmojiLoglevelFormatter(ANSIFormatter):
class ANSIStrippingFormatter ( ANSIFormatter ) :
class ANSIStrippingFormatter ( ANSIFormatter ) :
""" A log formatter that strips ANSI.
""" A log formatter that strips ANSI.
"""
"""
def format ( self , record ) :
def format ( self , record ) :
msg = super ( ANSIStrippingFormatter , self ) . format ( record )
msg = super ( ANSIStrippingFormatter , self ) . format ( record )
return ansi_escape . sub ( ' ' , msg )
return ansi_escape . sub ( ' ' , msg )
@ -127,7 +125,6 @@ class Configuration(object):
This class never raises IndexError , instead it will return None if a
This class never raises IndexError , instead it will return None if a
section or option does not yet exist .
section or option does not yet exist .
"""
"""
def __contains__ ( self , key ) :
def __contains__ ( self , key ) :
return self . _config . __contains__ ( key )
return self . _config . __contains__ ( key )
@ -214,9 +211,8 @@ def handle_store_boolean(self, *args, **kwargs):
class SubparserWrapper ( object ) :
class SubparserWrapper ( object ) :
""" Wrap subparsers so we can populate the normal and the shadow parser .
""" Wrap subparsers so we can track what options the user passed .
"""
"""
def __init__ ( self , cli , submodule , subparser ) :
def __init__ ( self , cli , submodule , subparser ) :
self . cli = cli
self . cli = cli
self . submodule = submodule
self . submodule = submodule
@ -232,26 +228,30 @@ class SubparserWrapper(object):
self . subparser . completer = completer
self . subparser . completer = completer
def add_argument ( self , * args , * * kwargs ) :
def add_argument ( self , * args , * * kwargs ) :
""" Add an argument for this subcommand.
This also stores the default for the argument in ` self . cli . default_arguments ` .
"""
if ' action ' in kwargs and kwargs [ ' action ' ] == ' store_boolean ' :
if ' action ' in kwargs and kwargs [ ' action ' ] == ' store_boolean ' :
# Store boolean will call us again with the enable/disable flag arguments
return handle_store_boolean ( self , * args , * * kwargs )
return handle_store_boolean ( self , * args , * * kwargs )
self . cli . acquire_lock ( )
self . cli . acquire_lock ( )
self . subparser . add_argument ( * args , * * kwargs )
self . subparser . add_argument ( * args , * * kwargs )
if self . submodule not in self . cli . default_arguments :
if ' default ' in kwargs :
self . cli . default_arguments [ self . submodule ] = { }
del kwargs [ ' default ' ]
self . cli . default_arguments [ self . submodule ] [ self . cli . get_argument_name ( * args , * * kwargs ) ] = kwargs . get ( ' default ' )
if ' action ' in kwargs and kwargs [ ' action ' ] == ' store_false ' :
kwargs [ ' action ' ] == ' store_true '
self . cli . subcommands_default [ self . submodule ] . add_argument ( * args , * * kwargs )
self . cli . release_lock ( )
self . cli . release_lock ( )
class MILC ( object ) :
class MILC ( object ) :
""" MILC - An Opinionated Batteries Included Framework
""" MILC - An Opinionated Batteries Included Framework
"""
"""
def __init__ ( self ) :
def __init__ ( self ) :
""" Initialize the MILC object.
""" Initialize the MILC object.
version
The version string to associate with your CLI program
"""
"""
# Setup a lock for thread safety
# Setup a lock for thread safety
self . _lock = threading . RLock ( ) if thread else None
self . _lock = threading . RLock ( ) if thread else None
@ -263,9 +263,10 @@ class MILC(object):
self . _inside_context_manager = False
self . _inside_context_manager = False
self . ansi = ansi_colors
self . ansi = ansi_colors
self . arg_only = [ ]
self . arg_only = [ ]
self . config = Configuration ( )
self . config = None
self . config_file = None
self . config_file = None
self . version = os . environ . get ( ' QMK_VERSION ' , ' unknown ' )
self . default_arguments = { }
self . version = ' unknown '
self . release_lock ( )
self . release_lock ( )
# Figure out our program name
# Figure out our program name
@ -273,6 +274,7 @@ class MILC(object):
self . prog_name = self . prog_name . split ( ' / ' ) [ - 1 ]
self . prog_name = self . prog_name . split ( ' / ' ) [ - 1 ]
# Initialize all the things
# Initialize all the things
self . read_config_file ( )
self . initialize_argparse ( )
self . initialize_argparse ( )
self . initialize_logging ( )
self . initialize_logging ( )
@ -282,7 +284,7 @@ class MILC(object):
@description . setter
@description . setter
def description ( self , value ) :
def description ( self , value ) :
self . _description = self . _arg_parser . description = self . _arg_defaults . description = value
self . _description = self . _arg_parser . description = value
def echo ( self , text , * args , * * kwargs ) :
def echo ( self , text , * args , * * kwargs ) :
""" Print colorized text to stdout.
""" Print colorized text to stdout.
@ -311,12 +313,9 @@ class MILC(object):
self . acquire_lock ( )
self . acquire_lock ( )
self . subcommands = { }
self . subcommands = { }
self . subcommands_default = { }
self . _subparsers = None
self . _subparsers = None
self . _subparsers_default = None
self . argwarn = argcomplete . warn
self . argwarn = argcomplete . warn
self . args = None
self . args = None
self . _arg_defaults = argparse . ArgumentParser ( * * kwargs )
self . _arg_parser = argparse . ArgumentParser ( * * kwargs )
self . _arg_parser = argparse . ArgumentParser ( * * kwargs )
self . set_defaults = self . _arg_parser . set_defaults
self . set_defaults = self . _arg_parser . set_defaults
self . print_usage = self . _arg_parser . print_usage
self . print_usage = self . _arg_parser . print_usage
@ -329,25 +328,18 @@ class MILC(object):
self . _arg_parser . completer = completer
self . _arg_parser . completer = completer
def add_argument ( self , * args , * * kwargs ) :
def add_argument ( self , * args , * * kwargs ) :
""" Wrapper to add arguments to both the main and the shadow argparser .
""" Wrapper to add arguments and track whether they were passed on the command line .
"""
"""
if ' action ' in kwargs and kwargs [ ' action ' ] == ' store_boolean ' :
if ' action ' in kwargs and kwargs [ ' action ' ] == ' store_boolean ' :
return handle_store_boolean ( self , * args , * * kwargs )
return handle_store_boolean ( self , * args , * * kwargs )
if kwargs . get ( ' add_dest ' , True ) and args [ 0 ] [ 0 ] == ' - ' :
kwargs [ ' dest ' ] = ' general_ ' + self . get_argument_name ( * args , * * kwargs )
if ' add_dest ' in kwargs :
del kwargs [ ' add_dest ' ]
self . acquire_lock ( )
self . acquire_lock ( )
self . _arg_parser . add_argument ( * args , * * kwargs )
self . _arg_parser . add_argument ( * args , * * kwargs )
if ' general ' not in self . default_arguments :
self . default_arguments [ ' general ' ] = { }
self . default_arguments [ ' general ' ] [ self . get_argument_name ( * args , * * kwargs ) ] = kwargs . get ( ' default ' )
# Populate the shadow parser
if ' default ' in kwargs :
del kwargs [ ' default ' ]
if ' action ' in kwargs and kwargs [ ' action ' ] == ' store_false ' :
kwargs [ ' action ' ] == ' store_true '
self . _arg_defaults . add_argument ( * args , * * kwargs )
self . release_lock ( )
self . release_lock ( )
def initialize_logging ( self ) :
def initialize_logging ( self ) :
@ -374,15 +366,14 @@ class MILC(object):
self . add_argument ( ' --log-file-fmt ' , default = ' [ %(levelname)s ] [ %(asctime)s ] [file: %(pathname)s ] [line: %(lineno)d ] %(message)s ' , help = ' Format string for log file. ' )
self . add_argument ( ' --log-file-fmt ' , default = ' [ %(levelname)s ] [ %(asctime)s ] [file: %(pathname)s ] [line: %(lineno)d ] %(message)s ' , help = ' Format string for log file. ' )
self . add_argument ( ' --log-file ' , help = ' File to write log messages to ' )
self . add_argument ( ' --log-file ' , help = ' File to write log messages to ' )
self . add_argument ( ' --color ' , action = ' store_boolean ' , default = True , help = ' color in output ' )
self . add_argument ( ' --color ' , action = ' store_boolean ' , default = True , help = ' color in output ' )
self . add_argument ( ' -c ' , ' - -config-file ' , help = ' The config file to read and/or writ e ' )
self . add_argument ( ' --config-file ' , help = ' The location for the configuration fil e ' )
self . add_argument ( ' --save-config ' , action = ' store_true ' , help = ' Save the running configuration to the config file ' )
self . arg_only . append ( ' config_ file ' )
def add_subparsers ( self , title = ' Sub-commands ' , * * kwargs ) :
def add_subparsers ( self , title = ' Sub-commands ' , * * kwargs ) :
if self . _inside_context_manager :
if self . _inside_context_manager :
raise RuntimeError ( ' You must run this before the with statement! ' )
raise RuntimeError ( ' You must run this before the with statement! ' )
self . acquire_lock ( )
self . acquire_lock ( )
self . _subparsers_default = self . _arg_defaults . add_subparsers ( title = title , dest = ' subparsers ' , * * kwargs )
self . _subparsers = self . _arg_parser . add_subparsers ( title = title , dest = ' subparsers ' , * * kwargs )
self . _subparsers = self . _arg_parser . add_subparsers ( title = title , dest = ' subparsers ' , * * kwargs )
self . release_lock ( )
self . release_lock ( )
@ -404,10 +395,12 @@ class MILC(object):
if self . config_file :
if self . config_file :
return self . config_file
return self . config_file
if self . args and self . args . general_config_file :
if ' --config-file ' in sys . argv :
return self . args . general_config_file
return Path ( sys . argv [ sys . argv . index ( ' --config-file ' ) + 1 ] ) . expanduser ( ) . resolve ( )
return os . path . join ( user_config_dir ( appname = ' qmk ' , appauthor = ' QMK ' ) , ' %s .ini ' % self . prog_name )
filedir = user_config_dir ( appname = ' qmk ' , appauthor = ' QMK ' )
filename = ' %s .ini ' % self . prog_name
return Path ( filedir ) / filename
def get_argument_name ( self , * args , * * kwargs ) :
def get_argument_name ( self , * args , * * kwargs ) :
""" Takes argparse arguments and returns the dest name.
""" Takes argparse arguments and returns the dest name.
@ -446,7 +439,7 @@ class MILC(object):
def arg_passed ( self , arg ) :
def arg_passed ( self , arg ) :
""" Returns True if arg was passed on the command line.
""" Returns True if arg was passed on the command line.
"""
"""
return self . args_passed [ arg ] in ( None , False )
return self . default_arguments . get ( arg ) != self . args [ arg ]
def parse_args ( self ) :
def parse_args ( self ) :
""" Parse the CLI args.
""" Parse the CLI args.
@ -459,25 +452,22 @@ class MILC(object):
self . acquire_lock ( )
self . acquire_lock ( )
self . args = self . _arg_parser . parse_args ( )
self . args = self . _arg_parser . parse_args ( )
self . args_passed = self . _arg_defaults . parse_args ( )
if ' entrypoint ' in self . args :
if ' entrypoint ' in self . args :
self . _entrypoint = self . args . entrypoint
self . _entrypoint = self . args . entrypoint
if self . args . general_config_file :
self . config_file = self . args . general_config_file
self . release_lock ( )
self . release_lock ( )
def read_config ( self ) :
def read_config_file ( self ) :
""" Parse the configuration file and determine the runtime configuration .
""" Read in the configuration file and store it in self.config .
"""
"""
self . acquire_lock ( )
self . acquire_lock ( )
self . config = Configuration ( )
self . config_file = self . find_config_file ( )
self . config_file = self . find_config_file ( )
if self . config_file and os . path . exists ( self . config_file ) :
if self . config_file and self . config_file . exists ( ) :
config = RawConfigParser ( self . config )
config = RawConfigParser ( self . config )
config . read ( self . config_file )
config . read ( str ( self . config_file ) )
# Iterate over the config file options and write them into self.config
# Iterate over the config file options and write them into self.config
for section in config . sections ( ) :
for section in config . sections ( ) :
@ -487,8 +477,10 @@ class MILC(object):
# Coerce values into useful datatypes
# Coerce values into useful datatypes
if value . lower ( ) in [ ' 1 ' , ' yes ' , ' true ' , ' on ' ] :
if value . lower ( ) in [ ' 1 ' , ' yes ' , ' true ' , ' on ' ] :
value = True
value = True
elif value . lower ( ) in [ ' 0 ' , ' no ' , ' false ' , ' none ' , ' off' ] :
elif value . lower ( ) in [ ' 0 ' , ' no ' , ' false ' , ' off ' ] :
value = False
value = False
elif value . lower ( ) in [ ' none ' ] :
continue
elif value . replace ( ' . ' , ' ' ) . isdigit ( ) :
elif value . replace ( ' . ' , ' ' ) . isdigit ( ) :
if ' . ' in value :
if ' . ' in value :
value = Decimal ( value )
value = Decimal ( value )
@ -497,32 +489,44 @@ class MILC(object):
self . config [ section ] [ option ] = value
self . config [ section ] [ option ] = value
# Fold the CLI args into self.config
self . release_lock ( )
def merge_args_into_config ( self ) :
""" Merge CLI arguments into self.config to create the runtime configuration.
"""
self . acquire_lock ( )
for argument in vars ( self . args ) :
for argument in vars ( self . args ) :
if argument in ( ' subparsers ' , ' entrypoint ' ) :
if argument in ( ' subparsers ' , ' entrypoint ' ) :
continue
continue
if ' _ ' in argument :
if argument not in self . arg_only :
section , option = argument . split ( ' _ ' , 1 )
# Find the argument's section
else :
if self . _entrypoint . __name__ in self . default_arguments and argument in self . default_arguments [ self . _entrypoint . __name__ ] :
section = self . _entrypoint . __name__
argument_found = True
option = argument
section = self . _entrypoint . __name__
if argument in self . default_arguments [ ' general ' ] :
if option not in self . arg_only :
argument_found = True
if hasattr ( self . args_passed , argument ) :
section = ' general '
if not argument_found :
raise RuntimeError ( ' Could not find argument in `self.default_arguments`. This should be impossible! ' )
exit ( 1 )
# Merge this argument into self.config
if argument in self . default_arguments :
arg_value = getattr ( self . args , argument )
arg_value = getattr ( self . args , argument )
if arg_value :
if arg_value :
self . config [ section ] [ option ] = arg_value
self . config [ section ] [ argument ] = arg_value
else :
else :
if option not in self . config [ section ] :
if argument not in self . config [ section ] :
self . config [ section ] [ option ] = getattr ( self . args , argument )
self . config [ section ] [ argument ] = getattr ( self . args , argument )
self . release_lock ( )
self . release_lock ( )
def save_config ( self ) :
def save_config ( self ) :
""" Save the current configuration to the config file.
""" Save the current configuration to the config file.
"""
"""
self . log . debug ( " Saving config file to ' %s ' " , self . config_file )
self . log . debug ( " Saving config file to ' %s ' " , str ( self . config_file ) )
if not self . config_file :
if not self . config_file :
self . log . warning ( ' %s .config_file file not set, not saving config! ' , self . __class__ . __name__ )
self . log . warning ( ' %s .config_file file not set, not saving config! ' , self . __class__ . __name__ )
@ -530,31 +534,34 @@ class MILC(object):
self . acquire_lock ( )
self . acquire_lock ( )
# Generate a sanitized version of our running configuration
config = RawConfigParser ( )
config = RawConfigParser ( )
config_dir = os . path . dirname ( self . config_file )
for section_name , section in self . config . _config . items ( ) :
for section_name , section in self . config . _config . items ( ) :
config . add_section ( section_name )
config . add_section ( section_name )
for option_name , value in section . items ( ) :
for option_name , value in section . items ( ) :
if section_name == ' general ' :
if section_name == ' general ' :
if option_name in [ ' save_ config' ] :
if option_name in [ ' config_file ' ] :
continue
continue
config . set ( section_name , option_name , str ( value ) )
if value is not None :
config . set ( section_name , option_name , str ( value ) )
if not os . path . exists ( config_dir ) :
# Write out the config file
os . makedirs ( config_dir )
config_dir = self . config_file . parent
if not config_dir . exists ( ) :
config_dir . mkdir ( parents = True , exist_ok = True )
with NamedTemporaryFile ( mode = ' w ' , dir = config_dir , delete = False ) as tmpfile :
with NamedTemporaryFile ( mode = ' w ' , dir = str ( config_dir ) , delete = False ) as tmpfile :
config . write ( tmpfile )
config . write ( tmpfile )
# Move the new config file into place atomically
# Move the new config file into place atomically
if os . path . getsize ( tmpfile . name ) > 0 :
if os . path . getsize ( tmpfile . name ) > 0 :
os . rename ( tmpfile . name , self . config_file )
os . rename ( tmpfile . name , str ( self . config_file ) )
else :
else :
self . log . warning ( ' Config file saving failed, not replacing %s with %s . ' , self . config_file , tmpfile . name )
self . log . warning ( ' Config file saving failed, not replacing %s with %s . ' , str ( self . config_file ) , tmpfile . name )
# Housekeeping
self . release_lock ( )
self . release_lock ( )
cli . log . info ( ' Wrote configuration to %s ' , shlex . quote ( self . config_file ) )
cli . log . info ( ' Wrote configuration to %s ' , shlex . quote ( str ( self . config_file ) ) )
def __call__ ( self ) :
def __call__ ( self ) :
""" Execute the entrypoint function.
""" Execute the entrypoint function.
@ -603,16 +610,11 @@ class MILC(object):
name = handler . __name__ . replace ( " _ " , " - " )
name = handler . __name__ . replace ( " _ " , " - " )
self . acquire_lock ( )
self . acquire_lock ( )
kwargs [ ' help ' ] = description
kwargs [ ' help ' ] = description
self . subcommands_default [ name ] = self . _subparsers_default . add_parser ( name , * * kwargs )
self . subcommands [ name ] = SubparserWrapper ( self , name , self . _subparsers . add_parser ( name , * * kwargs ) )
self . subcommands [ name ] = SubparserWrapper ( self , name , self . _subparsers . add_parser ( name , * * kwargs ) )
self . subcommands [ name ] . set_defaults ( entrypoint = handler )
self . subcommands [ name ] . set_defaults ( entrypoint = handler )
if name not in self . __dict__ :
self . __dict__ [ name ] = self . subcommands [ name ]
else :
self . log . debug ( " Could not add subcommand ' %s ' to attributes, key already exists! " , name )
self . release_lock ( )
self . release_lock ( )
return handler
return handler
@ -620,7 +622,6 @@ class MILC(object):
def subcommand ( self , description , * * kwargs ) :
def subcommand ( self , description , * * kwargs ) :
""" Decorator to register a subcommand.
""" Decorator to register a subcommand.
"""
"""
def subcommand_function ( handler ) :
def subcommand_function ( handler ) :
return self . add_subcommand ( handler , description , * * kwargs )
return self . add_subcommand ( handler , description , * * kwargs )
@ -644,9 +645,9 @@ class MILC(object):
self . log_format = self . config [ ' general ' ] [ ' log_fmt ' ]
self . log_format = self . config [ ' general ' ] [ ' log_fmt ' ]
if self . config . general . color :
if self . config . general . color :
self . log_format = ANSIEmojiLoglevelFormatter ( self . args . general_ log_fmt, self . config . general . datetime_fmt )
self . log_format = ANSIEmojiLoglevelFormatter ( self . args . log_fmt , self . config . general . datetime_fmt )
else :
else :
self . log_format = ANSIStrippingFormatter ( self . args . general_ log_fmt, self . config . general . datetime_fmt )
self . log_format = ANSIStrippingFormatter ( self . args . log_fmt , self . config . general . datetime_fmt )
if self . log_file :
if self . log_file :
self . log_file_handler = logging . FileHandler ( self . log_file , self . log_file_mode )
self . log_file_handler = logging . FileHandler ( self . log_file , self . log_file_mode )
@ -673,13 +674,9 @@ class MILC(object):
colorama . init ( )
colorama . init ( )
self . parse_args ( )
self . parse_args ( )
self . read _config( )
self . merge_args_into _config( )
self . setup_logging ( )
self . setup_logging ( )
if ' save_config ' in self . config . general and self . config . general . save_config :
self . save_config ( )
exit ( 0 )
return self
return self
def __exit__ ( self , exc_type , exc_val , exc_tb ) :
def __exit__ ( self , exc_type , exc_val , exc_tb ) :