Source code for engineer.commands.bundled

# coding=utf-8
from __future__ import absolute_import

import filecmp
import gzip
import os
from codecs import open

import arrow
from path import path
from feedgenerator import Rss201rev2Feed, Atom1Feed

from engineer.commands.core import ArgparseCommand
from engineer.util import mirror_folder, ensure_exists, slugify, relpath, compress, has_files, diff_dir


try:
    # noinspection PyPep8Naming
    import cPickle as pickle
except ImportError:
    import pickle

__author__ = 'Tyler Butler <tyler@tylerbutler.com>'


# noinspection PyShadowingBuiltins
[docs]class BuildCommand(ArgparseCommand): """ Builds an Engineer site. .. seealso:: :ref:`engineer build` """ name = 'build' help = 'Build the site.' def add_arguments(self): self.parser.add_argument('-c', '--clean', dest='clean', action='store_true', help="Clean the output directory and clear all the caches before building.") # An alternative to setting handler_function = build # self.parser.set_defaults(handle=self.build) # noinspection PyShadowingBuiltins @classmethod def build(cls, args=None): from engineer.conf import settings from engineer.filters import naturaltime from engineer.loaders import LocalLoader from engineer.log import get_file_handler from engineer.models import PostCollection, TemplatePage from engineer.themes import ThemeManager if args and args.clean: clean = CleanCommand(None) clean.clean() settings.create_required_directories() logger = cls.get_logger() logger.parent.addHandler(get_file_handler(settings.LOG_FILE)) logger.debug("Starting build using configuration file %s." % settings.SETTINGS_FILE) build_stats = { 'time_run': arrow.now(), 'counts': { 'template_pages': 0, 'new_posts': 0, 'cached_posts': 0, 'rollups': 0, 'tag_pages': 0, }, 'files': {}, } # Remove the output cache (not the post cache or the Jinja cache) # since we're rebuilding the site settings.OUTPUT_CACHE_DIR.rmtree(ignore_errors=True) # Copy 'raw' content to output cache - first pass # This first pass ensures that any static content - JS/LESS/CSS - that # is needed by site-specific pages (like template pages) is available # during the build if settings.CONTENT_DIR.exists(): mirror_folder(settings.CONTENT_DIR, settings.OUTPUT_CACHE_DIR, delete_orphans=False) theme = ThemeManager.current_theme() # Copy theme static content to output dir theme_output_dir = settings.OUTPUT_STATIC_DIR / 'theme' logger.debug("Copying theme static files to output cache.") theme.copy_content(theme_output_dir) logger.debug("Copied static files for theme to %s." % relpath(theme_output_dir)) # Copy any theme additional content to output dir if needed if theme.content_mappings: logger.debug("Copying additional theme content to output cache.") theme.copy_related_content(theme_output_dir) logger.debug("Copied additional files for theme to %s." % relpath(theme_output_dir)) # Load markdown input posts logger.info("Loading posts...") new_posts, cached_posts = LocalLoader.load_all(input=settings.POST_DIR) all_posts = PostCollection(new_posts + cached_posts) to_publish = PostCollection(all_posts.published) if settings.PUBLISH_DRAFTS: to_publish.extend(all_posts.drafts) if settings.PUBLISH_PENDING: to_publish.extend(all_posts.pending) if settings.PUBLISH_REVIEW: to_publish.extend(all_posts.review) if not settings.PUBLISH_PENDING and len(all_posts.pending) > 0: logger.warning("This site contains the following pending posts:") for post in all_posts.pending: logger.warning("\t'%s' - publish time: %s, %s." % (post.title, naturaltime(post.timestamp), post.timestamp_local)) logger.warning("These posts won't be published until you build the site again after their publish time.") all_posts = PostCollection( sorted(to_publish, reverse=True, key=lambda p: p.timestamp)) # Generate template pages if settings.TEMPLATE_PAGE_DIR.exists(): logger.info("Generating template pages from %s." % settings.TEMPLATE_PAGE_DIR) template_pages = [] for template in settings.TEMPLATE_PAGE_DIR.walkfiles('*.html'): # We create all the TemplatePage objects first so we have all of the URLs to them in the template # environment. Without this step, template pages might have broken links if they link to a page that is # loaded after them, since the URL to the not-yet-loaded page will be missing. template_pages.append(TemplatePage(template)) for page in template_pages: rendered_page = page.render_item(all_posts) ensure_exists(page.output_path) with open(page.output_path / page.output_file_name, mode='wb', encoding='UTF-8') as the_file: the_file.write(rendered_page) logger.info("Output template page %s." % relpath(the_file.name)) build_stats['counts']['template_pages'] += 1 logger.info("Generated %s template pages." % build_stats['counts']['template_pages']) # Generate individual post pages for post in all_posts: rendered_post = post.render_item(all_posts) ensure_exists(post.output_path) with open(post.output_path, mode='wb', encoding='UTF-8') as the_file: the_file.write(rendered_post) if post in new_posts: logger.console("Output new or modified post '%s'." % post.title) build_stats['counts']['new_posts'] += 1 elif post in cached_posts: build_stats['counts']['cached_posts'] += 1 # Generate rollup pages num_posts = len(all_posts) if num_posts % settings.ROLLUP_PAGE_SIZE == 0: num_slices = num_posts / settings.ROLLUP_PAGE_SIZE else: num_slices = (num_posts / settings.ROLLUP_PAGE_SIZE) + 1 slice_num = 0 for posts in all_posts.paginate(): slice_num += 1 has_next = slice_num < num_slices has_previous = 1 < slice_num <= num_slices rendered_page = posts.render_listpage_html(slice_num, has_next, has_previous) ensure_exists(posts.output_path(slice_num)) with open(posts.output_path(slice_num), mode='wb', encoding='UTF-8') as the_file: the_file.write(rendered_page) logger.debug("Output rollup page %s." % relpath(the_file.name)) build_stats['counts']['rollups'] += 1 # Copy first rollup page to root of site - it's the homepage. if slice_num == 1: path.copyfile(posts.output_path(slice_num), settings.OUTPUT_CACHE_DIR / 'index.html') logger.debug( "Output '%s'." % (settings.OUTPUT_CACHE_DIR / 'index.html')) # Generate archive page if num_posts > 0: archive_output_path = settings.OUTPUT_CACHE_DIR / 'archives/index.html' ensure_exists(archive_output_path) rendered_archive = all_posts.render_archive_html(all_posts) with open(archive_output_path, mode='wb', encoding='UTF-8') as the_file: the_file.write(rendered_archive) logger.debug("Output %s." % relpath(the_file.name)) # Generate tag pages if num_posts > 0: tags_output_path = settings.OUTPUT_CACHE_DIR / 'tag' for tag in all_posts.all_tags: rendered_tag_page = all_posts.render_tag_html(tag, all_posts) tag_path = ensure_exists( tags_output_path / slugify(tag) / 'index.html') with open(tag_path, mode='wb', encoding='UTF-8') as the_file: the_file.write(rendered_tag_page) build_stats['counts']['tag_pages'] += 1 logger.debug("Output %s." % relpath(the_file.name)) # Generate feeds rss_feed_output_path = ensure_exists(settings.OUTPUT_CACHE_DIR / 'feeds/rss.xml') atom_feed_output_path = ensure_exists(settings.OUTPUT_CACHE_DIR / 'feeds/atom.xml') rss_feed = Rss201rev2Feed( title=settings.FEED_TITLE, link=settings.SITE_URL, description=settings.FEED_DESCRIPTION, feed_url=settings.FEED_URL ) atom_feed = Atom1Feed( title=settings.FEED_TITLE, link=settings.SITE_URL, description=settings.FEED_DESCRIPTION, feed_url=settings.FEED_URL ) for feed in (rss_feed, atom_feed): for post in all_posts[:settings.FEED_ITEM_LIMIT]: title = settings.JINJA_ENV.get_template('core/feeds/title.jinja2').render(post=post) link = settings.JINJA_ENV.get_template('core/feeds/link.jinja2').render(post=post) content = settings.JINJA_ENV.get_template('core/feeds/content.jinja2').render(post=post) feed.add_item( title=title, link=link, description=content, pubdate=post.timestamp, unique_id=post.absolute_url) with open(rss_feed_output_path, mode='wb') as the_file: rss_feed.write(the_file, 'UTF-8') logger.debug("Output %s." % relpath(the_file.name)) with open(atom_feed_output_path, mode='wb') as the_file: atom_feed.write(the_file, 'UTF-8') logger.debug("Output %s." % relpath(the_file.name)) # Generate sitemap sitemap_file_name = 'sitemap.xml.gz' sitemap_output_path = ensure_exists(settings.OUTPUT_CACHE_DIR / sitemap_file_name) sitemap_content = settings.JINJA_ENV.get_or_select_template(['sitemap.xml', 'theme/sitemap.xml', 'core/sitemap.xml']).render(post_list=all_posts) with gzip.open(sitemap_output_path, mode='wb') as the_file: the_file.write(sitemap_content) logger.debug("Output %s." % relpath(the_file.name)) # Copy 'raw' content to output cache - second/final pass if settings.CONTENT_DIR.exists(): mirror_folder(settings.CONTENT_DIR, settings.OUTPUT_CACHE_DIR, delete_orphans=False) # Compress all files marked for compression for the_file, compression_type in settings.COMPRESS_FILE_LIST: if the_file not in settings.COMPRESSION_CACHE: with open(the_file, mode='rb') as input: output = compress(input.read(), compression_type) logger.debug("Compressed %s." % relpath(the_file)) settings.COMPRESSION_CACHE[the_file] = output else: logger.debug("Found pre-compressed file in cache: %s." % relpath(the_file)) output = settings.COMPRESSION_CACHE[the_file] with open(the_file, mode='wb') as f: f.write(output) # Remove LESS files if LESS preprocessing is being done if settings.PREPROCESS_LESS: logger.debug("Deleting LESS files since PREPROCESS_LESS is True.") for f in settings.OUTPUT_STATIC_DIR.walkfiles(pattern="*.less"): logger.debug("Deleting file: %s." % relpath(f)) f.remove_p() # Check if anything has changed other than the sitemap have_changes = False compare = filecmp.dircmp(settings.OUTPUT_CACHE_DIR, settings.OUTPUT_DIR, ignore=settings.OUTPUT_DIR_IGNORE) # The algorithm below takes advantage of the fact that once we've determined that there is more than one file # that's different, or if the first item returned by the generator is not the sitemap, then we can break out of # the generator loop early. This is also advantageous because it doesn't require us to completely exhaust the # generator. In the case of a fresh site build, for example, the generator will return a lot more data. So the # other approach here of expanding the generator into a list with a list comprehension would be inefficient # in many cases. This approach performs equally well in all cases at the cost of some unusual-looking code. diff_file_count = 0 if not has_files(settings.OUTPUT_DIR): have_changes = True else: for file_path in diff_dir(compare): diff_file_count += 1 if file_path != sitemap_output_path: have_changes = True break if diff_file_count > 1: have_changes = True break if not have_changes: logger.console('') logger.console("No site changes to publish.") else: logger.debug("Synchronizing output directory with output cache.") build_stats['files'] = mirror_folder(settings.OUTPUT_CACHE_DIR, settings.OUTPUT_DIR, ignore_list=settings.OUTPUT_DIR_IGNORE) from pprint import pformat logger.debug("Folder mirroring report: %s" % pformat(build_stats['files'])) logger.console('') logger.console("Site: '%s' output to %s." % (settings.SITE_TITLE, settings.OUTPUT_DIR)) logger.console("Posts: %s (%s new or updated)" % ( (build_stats['counts']['new_posts'] + build_stats['counts']['cached_posts']), build_stats['counts']['new_posts'])) logger.console("Post rollup pages: %s (%s posts per page)" % ( build_stats['counts']['rollups'], settings.ROLLUP_PAGE_SIZE)) logger.console("Template pages: %s" % build_stats['counts']['template_pages']) logger.console("Tag pages: %s" % build_stats['counts']['tag_pages']) logger.console("%s new items, %s modified items, and %s deleted items." % ( len(build_stats['files']['new']), len(build_stats['files']['overwritten']), len(build_stats['files']['deleted']))) logger.console('') logger.console("Full build log at %s." % settings.LOG_FILE) logger.console('') with open(settings.BUILD_STATS_FILE, mode='wb') as the_file: pickle.dump(build_stats, the_file) settings.CACHE.close() return have_changes, build_stats handler_function = build
# noinspection PyShadowingBuiltins
[docs]class CleanCommand(ArgparseCommand): """ Cleans an Engineer site's output directory and clears all caches. .. seealso:: :ref:`engineer clean` """ name = 'clean' help = "Clean the output directory and clear all caches." # noinspection PyUnusedLocal def clean(self, args=None): from engineer.conf import settings logger = self.get_logger() # Expand the ignore list to be full paths ignore_list = [path(settings.OUTPUT_DIR / i).normpath() for i in settings.OUTPUT_DIR_IGNORE] ignore_dirs = [p for p in ignore_list if p.isdir()] ignore_files = [] for the_dir in ignore_dirs: ignore_files.extend([f.normpath() for f in the_dir.walkfiles()]) ignore_files.extend([p.normpath() for p in ignore_list if p.isfile()]) # Delete all FILES that are not ignored if settings.OUTPUT_DIR.exists(): for p in settings.OUTPUT_DIR.walkfiles(): if p in ignore_files: continue else: p.remove() # Delete all directories with no files. All non-ignored files were already deleted so every directory # except those that were ignored will be empty. for dirpath, dirnames, filenames in os.walk(settings.OUTPUT_DIR.normpath()): dirpath = path(dirpath) if dirpath != settings.OUTPUT_DIR: if not has_files(dirpath): # no files under this entire path, so can call rmtree # noinspection PyArgumentList dirpath.rmtree() del dirnames[:] elif dirpath in ignore_list: # we don't need to descend into the subdirs if this dir is in the ignore list del dirnames[:] delete_paths = ( settings.OUTPUT_DIR, settings.OUTPUT_CACHE_DIR, settings.CACHE_DIR, ) for the_path in delete_paths: try: the_path.rmtree() logger.info("Deleted %s." % the_path) except OSError as we: if hasattr(we, 'winerror') and we.winerror not in (2, 3): logger.exception(we.message) else: logger.warning("Couldn't find output directory to delete: %s" % we.filename) logger.console('Cleaned output directory: %s' % settings.OUTPUT_DIR) handler_function = clean
# noinspection PyShadowingBuiltins
[docs]class ServeCommand(ArgparseCommand): """ Serves an Engineer site using a built-in development server. .. seealso:: :ref:`engineer serve` """ name = 'serve' help = "Start the development server." def add_arguments(self): self.parser.add_argument('-p', '--port', type=int, default=8000, dest='port', help="The port the development server should listen on.") def handler_function(self, args=None): import bottle from engineer.conf import settings from engineer import emma logger = self.get_logger() if not settings.OUTPUT_DIR.exists(): logger.warning( "Output directory doesn't exist - did you forget to run 'engineer build'?") exit() debug_server = bottle.Bottle() debug_server.mount('/_emma', emma.Emma().app) # noinspection PyUnresolvedReferences,PyUnusedLocal @debug_server.route('/') @debug_server.route('/<filepath:path>') def serve_static(filepath='index.html'): if settings.HOME_URL != '/': # if HOME_URL is not root, we need to adjust the paths if filepath.startswith(settings.HOME_URL[1:]): filepath = filepath[len(settings.HOME_URL) - 1:] else: return bottle.HTTPResponse(status=404) response = bottle.static_file(filepath, root=settings.OUTPUT_DIR) if type(response) is bottle.HTTPError: return bottle.static_file(path(filepath) / 'index.html', root=settings.OUTPUT_DIR) else: return response bottle.debug(True) bottle.run(app=debug_server, host='0.0.0.0', port=args.port, reloader=True)
# noinspection PyShadowingBuiltins
[docs]class InitCommand(ArgparseCommand): """ Initializes a new engineer site in the current directory. .. seealso:: :ref:`engineer init` """ name = 'init' help = "Initialize the current directory as an engineer site." need_settings = False def add_arguments(self): self.parser.add_argument('-m', '--mode', dest='mode', default='default', choices=['azure'], help="Initialize site with folder structures designed for deployment to a service " "such as Azure.") self.parser.add_argument('--sample', dest='sample', action='store_true', help="Include sample content.") self.parser.add_argument('--force', '-f', dest='force', action='store_true', help="Delete target folder contents. Use with caution!") def handler_function(self, args=None): from engineer import __file__ as package_file logger = self.get_logger() sample_site_path = path(package_file).dirname() / ('sample_site/%s' % args.mode) target = path.getcwd() if target.listdir() and not args.force: logger.warning("Target folder %s is not empty." % target) exit() elif args.force: logger.info("Deleting folder contents.") try: for item in target.dirs(): item.rmtree() for item in target.files(): item.remove() except Exception as e: logger.error("Couldn't delete folder contents - aborting.") logger.exception(e) exit() if args.sample: mirror_folder(sample_site_path, target) else: ensure_exists(target / 'posts') ensure_exists(target / 'content') ensure_exists(target / 'templates') mirror_folder(sample_site_path, target, recurse=False) logger.console("Initialization complete.") exit()
# noinspection PyShadowingBuiltins class EmmaCommand(ArgparseCommand): name = 'emma' help = "Start Emma, the built-in management server." def add_arguments(self): self.parser.add_argument('-p', '--port', type=int, default=8080, dest='port', help="The port Emma should listen on.") self.parser.add_argument('--prefix', type=str, dest='prefix', help="The prefix path the Emma site will be rooted at.") emma_options = self.parser.add_mutually_exclusive_group(required=True) emma_options.add_argument('-r', '--run', dest='run', action='store_true', help="Run Emma.") emma_options.add_argument('-g', '--generate', dest='generate', action='store_true', help="Generate a new secret location for Emma.") emma_options.add_argument('-u', '--url', dest='url', action='store_true', help="Get Emma's current URL.") def handler_function(self, args=None): from engineer import emma logger = self.get_logger() em = emma.EmmaStandalone() try: if args.prefix: em.emma_instance.prefix = args.prefix if args.generate: em.emma_instance.generate_secret() logger.console( "New Emma URL: %s" % em.emma_instance.get_secret_path(True)) elif args.url: logger.console( "Current Emma URL: %s" % em.emma_instance.get_secret_path(True)) elif args.run: em.run(port=args.port) except emma.NoSecretException: logger.warning( "You haven't created a secret for Emma yet. Try 'engineer emma --generate' first.") exit()