Pberndt V4

Direkt zum Inhalt springen


Quellcode unpack.py

Beschreibung

Ein Script zum Entpacken diverser Dateiformate.

Sourcecode

#!/usr/bin/python
# vim:noexpandtab:ts=8:sw=8
#
# Unpacking script
# Copyright (c) 2008-2014, Phillip Berndt
#
# No documentation required, I guess. Just run `unpack -h` for
# command line stuff. This script is very simple. If you extend
# it by adding support for more file types or such, please contact
# me and I will update this version of the script according to your
# changes :)
#
# Licenced under GPL.
#
# Todo:
#  - Support for shar archives (without actually running the script)
#  - ISO and MSI support without 7z
#
# Changelog:
#  - 29.04.2018
#    Fix debian packages (control.tar.* support)
#  - 06.10.2016
#    Support paxel in addition to axel
#  - 04.10.2016
#    Add zstandard support
#  - 16.12.2014
#    Fixed RPM support (Correctly skip header, LZMA archive through recursive unpack invocation)
#  - 27.02.2014
#    If axel fails, retry download with more reliable tool
#  - 18.02.2014
#    squashfs support
#  - 24.01.2014
#    Added libarchive support
#    Fixed .gz file extension
#    Fixed a bug where the script tried to delete directories which look related to an archive
#  - 14.05.2013
#    Fixed xz support
#  - 12.05.2013
#    Replaced generator by more flexible approach using environment variables
#    Added bz2 and gz support
#  - 08.05.2013
#    Fixed CPIO and RPM support
#  - 17.04.2013
#    Support for .tar.xz in deb archives
#  - 12.12.2012
#    Support for .tar.xz
#  - 06.12.2012
#    7z command name updated
#  - 07.11.2012
#    Added options for target specification and passing-through to extractors
#  - 21.10.2012
#    fixed .zip mime-type
#  - 03.07.2012
#    7z was renamed
#  - 04.04.2011
#    Improved detection of related files
#  - 17.09.2010
#    Added DEB file compression type detection
#    Rewrote shell-escaping
#  - 04.05.2010
#    Added LZMA support
#  - 08.09.2009
#    Removing other file parts as well
#  - 21.07.2009
#    Fixed a bug in filetype autodetection
#  - 26.02.2008
#    Added multiple unpack options for DEB archives
#  - 22.02.2008
#    Changed &> to > due to incompatibility to debian's sh
#  - 27.01.2008
#    ISO/MSI support via p7z
#    ZIP uses Python module "zip" if unzip is not found
#  - 15.01.2008
#    Added support for MIME email messages
#    Improved class design
#  - 14.01.2008
#    Fixed a forget-to-kill-debugging-stuff bug
#    RPM uses cpio directly and no longer rpm2tbz
#    Makeself support improved
#  - 12.01.2008
#    Added support for Makeself and deb
#    Uses `file` to guess file type now
#    Neater output when extracting multiple files
#    Fixed a bug in RPM code
#  - 09.01.2008
#    Added support for more file formats and SCP transmission
#    Fixed subdirectory stuff
#    Improved program design
#    If curl is not found the downloader will use wget instead or
#    fallback to python urllib
#  - 08.01.2008
#    Wrote initial version of the script. Enjoy!
#
from glob import glob
import getopt
import os
import os.path
import re
import struct
import subprocess
import sys

# Functions {{{
def se(parameter):
    return '"' + str(parameter).replace('"', r'\"') + '"'
# }}}
# Define file types {{{
class filetype(object): # {{{
    """
        Default filetype base class. Derive all download classes from
        this one. Only direct subclasses will be taken into account!
    """
    PRIORITY = 0 # Lower is better
    # Regex to match against the end of the filename
    TYPE_DEF = "" # Regex to match against the end of the filename
    TYPE_MIME_DEF = "" # Regex to match against the output of "file -NLbzi"
    # Program required on the host to unpack this format
    TYPE_REQUIRES = ""
    __slots__ = [ "file" ]
    def __init__(cls, file):
        """
            Initialize the class object. The file-parameter contains
            the file to be unpacked
        """
        cls.file = file
    @classmethod
    def canHandle(cls, file):
        """
            Check if this class can handle the file based on its actual
            contents or similar. Any tests the other methods can
't handle
            go here.
        """
        return False
    @classmethod
    def extensionMatches(cls, file):
        """
            Checks if this class is the correct one to unpack <file>
            by comparing the file extension
        """
        if cls.TYPE_DEF:
            return re.search(cls.TYPE_DEF + "$", file)
        else:
            return False
    @classmethod
    def mimeTypeMatches(cls, mimeType):
        """
            Checks if this class is the correct one to unpack <file>
            by comparing the mime type (eg "file -NLbzi <file>" output
        """
        if cls.TYPE_MIME_DEF:
            return re.search("^" + cls.TYPE_MIME_DEF, mimeType)
        else:
            return False
    @classmethod
    def toolsAvailable(cls):
        """
            Checks whether the tools required by this class are avaolable
        """
        if cls.TYPE_REQUIRES and os.system("which %s >/dev/null" % cls.TYPE_REQUIRES) != 0:
            if cls.TYPE_REQUIRES.find(" ") > -1:
                sout("warn", "Could unpack this file, but you don't have one of the required packages: %s" % cls.TYPE_REQUIRES.replace(" ", ", "))
            else:
                sout("warn", "Could unpack this file, but you don't have %s installed" % cls.TYPE_REQUIRES)
            return False
        return True
    def unpack(cls, destination, **parameters):
        """
            Called when a file needs to be unpacked. The parameter
            destination specifies into which directory the archive
            should be unpacked
        """
        raise Exception("You need to override unpack")
# }}}
def defaultTypeGenerator(regexExtension, regexMime, program, command): # {{{
    """
        Generate a default implementation of the filetype-class.
        TYPE_DEF and TYPE_REQUIRES are the first two parameters. Depending on
        the value of the last parameter, unpack is implemented as
            command % (file, destination folder)
        or with the tuple reversed
    """
    class tmpcls(filetype):
        TYPE_DEF = regexExtension
        TYPE_MIME_DEF = regexMime
        TYPE_REQUIRES = program
        def unpack(cls, destination, **parameters):
            if "parameters" in parameters:
                params = parameters["parameters"]
            else:
                params = ""
            os.environ["PARAMS"] = params
            os.environ["FILE"] = cls.file
            os.environ["DEST"] = destination
            return os.system(command) == 0
    return tmpcls
# }}}


# Various file types
class ZIPFile(filetype): # {{{
    TYPE_DEF = r"\.(?:zip|jar)"
    TYPE_MIME_DEF = "application/(?:x-)?zip"
    __slots__ = [ "USE_CMDLINE", "file" ]
    @classmethod
    def toolsAvailable(cls):
        if os.system("which unzip >/dev/null") == 0:
            cls.USE_CMDLINE = True
        else:
            cls.USE_CMDLINE = False
            try:
                import zipfile
            except:
                sout("warn", "You neither have the zipfile python extension nor unzip installed. Can't unpack ZIP archives.")
                return False
        return True
    def unpack(cls, destination, **parameters):
        if "parameters" in parameters:
            params = parameters["parameters"]
        else:
            params = ""
        if cls.USE_CMDLINE:
            return os.system("unzip %s %s -d %s" % (params, se(cls.file), se(destination))) == 0
        else:
            try:
                import zipfile
                fileObject = zipfile.ZipFile(cls.file)
                for node in fileObject.namelist():
                    nodeTitle = node
                    if node.find("..") != -1:
                        sout("warn", "Found suspicious filename in archive, ignoring '..': " + node)
                        node = node.replace("..", ".")
                    if node[0] == "/":
                        node = "./" + node
                    print node
                    targetDirectory = os.path.join(destination, os.path.dirname(node))
                    if not os.path.isdir(targetDirectory):
                        os.makedirs(targetDirectory)
                    if nodeTitle[-1] != "/":
                        targetFile = open(os.path.join(destination, node), "w")
                        targetFile.write(fileObject.read(nodeTitle))
                        targetFile.close()
                del fileObject
                return True
            except:
                sout("bad", "Failed to extract ZIP archive")
            return False
# }}}
RARFile = defaultTypeGenerator(r"\.rar", "application/x-rar", "unrar", "unrar x $PARAMS \"$FILE\" \"$DEST\"")
TARLZMAFile = defaultTypeGenerator(r"\.tar\.lzma", "application/x-tar.+application/x-lzma", "tar", "tar x $PARAMS --lzma -C \"$DEST\" -f \"$FILE\"")
TBZFile = defaultTypeGenerator(r"\.(tbz|tar\.bz2?)", "application/x-tar.+application/x-bzip2", "tar", "tar xj `echo $PARAMS` -C \"$DEST\" -f \"$FILE\"")
TGZFile = defaultTypeGenerator(r"\.(tgz|tar\.g?z)", "application/x-tar.+application/x-gzip", "tar", "tar xz `echo $PARMAS` -C \"$DEST\" -f \"$FILE\"")
TXZFile = defaultTypeGenerator(r"\.tar\.xz", "application/x-tar.+application/x-xz", "tar", "tar x `echo $PARAMS` --xz -C \"$DEST\" -f \"$FILE\"")
TARFile = defaultTypeGenerator(r"\.tar", "application/x-tar", "tar", "tar x `echo $PARAMS` -C \"$DEST\" -f \"$FILE\"")
TARFZstile = defaultTypeGenerator(r"\.tar\.zst", "", "tar zstdcat", "zstdcat \"$FILE\" | tar x `echo $PARAMS` -C \"$DEST\"")
CABFile = defaultTypeGenerator(r"\.cab", "^$", "cabextract", "cabextract `echo $PARAMS` -d \"$DEST\" \"$FILE\"")
LHAFile = defaultTypeGenerator(r"\.lha", "application/x-lha", "lha", "lha x `echo $PARAMS` -w=\"$DEST\" \"$FILE\"")
ARJFile = defaultTypeGenerator(r"\.arj", "application/x-arj", "arj", "arj x `echo $PARAMS` \"$FILE\" \"$DEST\"")
CPIOfile = defaultTypeGenerator(r"\.cpio", "application/x-cpio", "cpio", "cat \"$FILE\" | (cd \"$DEST\" && cpio -i --no-absolute-filenames --make-directories `echo $PARAMS`)")
BZ2File = defaultTypeGenerator(r"\.bz2", "application/x-bzip2", "bzip2", "cat \"$FILE\" | (cd \"$DEST\"; bzip2 -d `echo $PARAMS` > `basename $FILE .bz2`)")
GZFile = defaultTypeGenerator(r"\.gz", "application/x-gzip", "gzip", "cat \"$FILE\" | (cd \"$DEST\"; gzip -d `echo $PARAMS` > `basename $FILE .gz`)")
for sevenZExecutable in ("7z", "7za", "7zr"):
    if os.system("which " + sevenZExecutable + " >/dev/null 2>&1") == 0:
        break
SEVENZFile = defaultTypeGenerator(r"\.7z", "^$", sevenZExecutable, sevenZExecutable + " x `echo $PARAMS` -o\"$DEST\" \"$FILE\"")
MSIFile = defaultTypeGenerator(r"\.msi", "", sevenZExecutable, sevenZExecutable + " x `echo $PARAMS` -o\"$DEST\" \"$FILE\"")
ISOFile = defaultTypeGenerator(r"\.iso", "application/x-iso9660", sevenZExecutable, sevenZExecutable + " x `echo $PARAMS` -o\"$DEST\" \"$FILE\"")
class DEBFile(filetype): # {{{
    TYPE_DEF = r"\.deb"
    TYPE_MIME_DEF = "application/x-debian-package"
    TYPE_REQUIRES = "ar tar"
    def unpack(cls, destination, **parameters):
        if "parameters" in parameters:
            sout("warn", "DEB extraction does not support additional parameters")
        sout("question", "This is a debian package. Please choose:")
        answer = listQuery([ "Unpack package", "Unpack package and control files",
            "Unpack ar-archive" ], 1)
        if type(answer) is bool:
            return False
        dataFile = os.popen("ar t '%s' | grep -E '^data.tar'" % cls.file.replace("'", r"\'")).read().strip()
        compressionFlag = ""
        if dataFile[-4:] == "lzma": compressionFlag = "--lzma"
        elif dataFile[-3:] == "bz2": compressionFlag = "-j"
        elif dataFile[-2:] == "gz": compressionFlag = "-z"
        elif dataFile[-2:] == "xz": compressionFlag = "--xz"
        controlFile = os.popen("ar t '%s' | grep -E '^control.tar'" % cls.file.replace("'", r"\'")).read().strip()
        controlCompressionFlag = ""
        if controlFile[-4:] == "lzma": controlCompressionFlag = "--lzma"
        elif controlFile[-3:] == "bz2": controlCompressionFlag = "-j"
        elif controlFile[-2:] == "gz": controlCompressionFlag = "-z"
        elif controlFile[-2:] == "xz": controlCompressionFlag = "--xz"

        if answer == 0:
            # Unpack data archive
            return os.system("ar p %s %s | tar x %s -C %s" % (se(cls.file), dataFile, compressionFlag, se(destination))) == 0
        elif answer == 1:
            # Unpack data & control archive
            return os.system(
                ("(ar p %s %s | tar x %s -C %s) && (mkdir %s/DEBIAN && "
                "ar p %s %s | tar x %s -C %s/DEBIAN)") %
                    (se(cls.file), dataFile, compressionFlag, se(destination), se(destination), se(cls.file), se(controlFile), se(controlCompressionFlag),
                    se(destination))) == 0
        elif answer == 2:
            # Unpack the ar-archive only
            try:
                for file in os.popen("ar t %r" % cls.file).readlines():
                    destDir = os.path.join(destination, os.path.dirname(file.strip()))
                    if not os.access(destDir, os.F_OK):
                        os.makedirs(destDir)
                    assert(os.system("ar p %s %s > %s/%s" %
                        (se(cls.file), se(file.strip()), se(destination),
                        se(file.strip()))) == 0)
                return True
            except:
                return False
        else:
            return False
# }}}
class MAKESELFFile(filetype): # {{{
    TYPE_DEF = r"\.run"
    TYPE_REQUIRES = ""
    def unpack(cls, destination, **parameters):
        if "parameters" in parameters:
            params = parameters["parameters"]
        else:
            params = ""
        # Find first newline followed by a non-print character in the file
        # This should be the TAR archive
        if os.system("grep -q Makeself %s" % se(cls.file)) != 0:
            sout("bad", "This does not look like a Makeself archive")
            return False
        archive = open(cls.file, "r")
        scriptData = ""
        while True:
            while True:
                byte = archive.read(1)
                scriptData += byte
                if byte == "\n":
                    break
                if not byte:
                    break
            byte = archive.read(1)
            scriptData += byte
            if not byte:
                sout("bad", "Binary data stream not found.")
                return False
            if byte != "\t" and byte != "\r" and byte != "\n" and (ord(byte) < 32 or ord(byte) > 126):
                archive.seek(-1, 1)
                break
        compressionType = re.search('COMPRESS="([^"]+)"', scriptData)
        if compressionType == None:
            compressionType = re.search(r"Compression: (\S+)", scriptData)
            if compressionType == None:
                sout("bad", "Failed to extract compression type")
                return False
        compressionType = compressionType.group(1)
        if compressionType == "Unix" or compressionType == "gzip":
            flags = "z"
        elif compressionType == "bzip2":
            flags = "j"
        else:
            sout("bad", "This compression type is not supported right now.")
        # Unpack
        targetPipe = subprocess.Popen("tar %sx %s -C %r" % (flags, params, destination), shell=True, stdin=subprocess.PIPE)
        while True:
            data = archive.read(1024)
            if not data:
                break
            targetPipe.stdin.write(data)
        archive.close()
        targetPipe.stdin.close()
        return os.WEXITSTATUS(targetPipe.wait()) == 0
# }}}
class RPMFile(filetype): # {{{
    TYPE_DEF = r"\.rpm"
    TYPE_MIME_DEF = "application/x-rpm"
    TYPE_REQUIRES = "cpio dd"
    def unpack(cls, destination, **parameters):
        if "parameters" in parameters and parameters["parameters"]:
            sout("warn", "RPM extraction does not support additional parameters")
        # We parse the RPM ourselves
        # See http://www.rpm.org/max-rpm/s1-rpm-file-format-rpm-file-format.html for RPM file structure
        sout("good", "Searching rpm for archive file offset")
        archiveHead = open(cls.file).read(1024**2 * 2)
        # Check if this realls is a RPM file
        if archiveHead[:4] != "\xed\xab\xee\xdb":
            sout("bad", "This does not look like an RPM file")
            return False
        # Skip Lead
        offset = 96
        # Skip the Header structures
        header_count = 0
        while archiveHead[offset:offset + 3] == "\x8e\xad\xe8" and header_count < 2:
            header_count += 1
            offset += 3 + 1 + 4
            entries, length = struct.unpack(">II", archiveHead[offset:offset + 8])
            if header_count == 1:
                # Pad to multiplier of 8; undocumented, but the Perl module does this as well
                length += 8 - (length % 8) if length % 8 != 0 else 0
            offset += 8 + 16 * entries + length
        # Unpack
        if offset > -1:
            sout("good", "Found archive offset")
            if archiveHead[offset:offset+2] == "\x1f\x8b":
                return os.system("dd ibs=%d skip=1 if=%s 2>/dev/null | gzip -d | (cd %s && cpio -i --no-absolute-filenames --make-directories)" % (offset, se(cls.file), se(destination))) == 0
            elif archiveHead[offset:offset+6] == "\xFD\x37\x7A\x58\x5A\x00":
                return os.system("dd ibs=%d skip=1 if=%s 2>/dev/null | xz -d | (cd %s && cpio -i --no-absolute-filenames --make-directories)" % (offset, se(cls.file), se(destination))) == 0
            else:
                sout("warn", "Unknown archive format: %r. Falling back to recursive unpack." % (archiveHead[offset:offset+4]))
                temporary_archive = "%s/package.compressed" % destination
                os.system("dd ibs=%d skip=1 if=%s 2>/dev/null > %s" % (offset, se(cls.file), se(temporary_archive)))
                return recursive_unpack(destination, temporary_archive)
        else:
            sout("bad", "Failed to find archive offset")
# }}}
class MIMEFile(filetype): # {{{
    TYPE_DEF = r"\.mime"
    TYPE_MIME_DEF = "message/rfc822"
    @classmethod
    def toolsAvailable(cls):
        try:
            import email.Parser
            return True
        except:
            sout("warn", "You don't have email.Parser python extension installed. Can't unpack MIME archives.")
            return False
    def unpack(cls, destination, **parameters):
        if "parameters" in parameters:
            sout("warn", "MIME extraction does not support additional parameters")
        import email.Parser
        try:
            mailData = email.Parser.Parser().parse(open(cls.file))
            if not mailData.is_multipart():
                sout("warn", "This is no multipart message. E.g. there are no attachments to be extracted.")
                return False
            for mailData in mailData.walk():
                fileName = mailData.get_filename(False)
                if fileName:
                    targetFile = open("%s/%s" % (destination, os.path.basename(fileName)), "w")
                    targetFile.write(mailData.get_payload(decode=True))
                    targetFile.close()
            return True
        except:
            sout("warn", "Looked like MIME mail but parsing the archive failed")
        return False
# }}}
class BZ2File(filetype): # {{{
    TYPE_DEF = r"(?<!\.tar)\.bz2"
    TYPE_MIME_DEF = "application/x-bzip2"
    __slots__ = [ "file" ]
    @classmethod
    def toolsAvailable(cls):
        if not os.system("which bzip2 >/dev/null") == 0:
            sout("warn", "You need bzip2 to decompress this file")
            return False
        return True
    def unpack(cls, destination, **parameters):
        if "parameters" in parameters:
            params = parameters["parameters"]
        else:
            params = ""
        return os.system("cp %s %s; cd %s; bzip2 %s -d %s" % (se(cls.file), se(destination), se(destination), params, se(cls.file))) == 0
# }}}
class LZMAFile(filetype): # {{{
    TYPE_DEF = r"\.lzma"
    TYPE_MIME_DEF = ""
    __slots__ = [ "file" ]
    @classmethod
    def toolsAvailable(cls):
        if not os.system("which lzma >/dev/null") == 0:
            sout("warn", "You need lzma to decompress this file")
            return False
        return True
    def unpack(cls, destination, **parameters):
        if "parameters" in parameters:
            params = parameters["parameters"]
        else:
            params = ""
        fnlzma = cls.file[:-5]
        return os.system("cat %s | (cd %s; lzma %s -d > %s)" % (se(cls.file), se(destination), params, se(fnlzma))) == 0
# }}}
class SquashFSFile(filetype): # {{{
    TYPE_DEF = r"\.squashfs"
    TYPE_MIME_DEF = ""
    __slots__ = [ "file" ]
    @classmethod
    def canHandle(cls, file):
        try:
            fileLine = os.popen("file %s" % se(file)).read()
            if "Squashfs filesystem" in fileLine:
                return True
        except:
            pass
        return False
    @classmethod
    def toolsAvailable(cls):
        if not os.system("which unsquashfs >/dev/null") == 0:
            sout("warn", "You need unsquashfs to decompress this file")
            return False
        return True
    def unpack(cls, destination, **parameters):
        if "parameters" in parameters:
            params = parameters["parameters"]
        else:
            params = ""
        return os.system("unsquashfs %s -f -d %s %s" % (params, se(destination), se(cls.file))) == 0
# }}}
class ZStandard(filetype):# {{{
    TYPE_DEF = r"\.zst"
    TYPE_MIME_DEF = ""
    TYPE_REQUIRES = "zstd"

    @classmethod
    def canHandle(cls, file):
        try:
            if open(file).read(4) == "\x28\xB5\x2F\xFD":
                return True
        except:
            pass
        return False

    def unpack(cls, destination, **parameters):
        if "parameters" in parameters:
            params = parameters["parameters"]
        else:
            params = ""
        destination = os.path.join(destination, os.path.splitext(cls.file)[0])
        return os.system("zstd %s -d -o %s %s" % (params, se(destination), se(cls.file))) == 0
# }}}

# libarchive support {{{
libarchive = False
try:
    import ctypes
    libarchive = ctypes.CDLL("libarchive.so.13")
    libarchive.archive_read_new.restype = ctypes.c_void_p
    libarchive.archive_compression_name.restype = ctypes.c_char_p
    libarchive.archive_format_name.restype = ctypes.c_char_p
    libarchive.archive_error_string.restype = ctypes.c_char_p
    libarchive.archive_entry_pathname.restype = ctypes.c_char_p
except:
    pass
if libarchive:
    class LibArchiveHandler(filetype):
        PRIORITY = 99
        @classmethod
        def canHandle(cls, file):
            ret = False
            r = libarchive.archive_read_new()
            libarchive.archive_read_support_format_all(r)
            libarchive.archive_read_support_filter_all(r)
            if libarchive.archive_read_open_filename(r, file, 1024) == 0:
                ret = True
                sout("good", "libarchive can handle this file. File type is %s/%s" % (libarchive.archive_format_name(r), libarchive.archive_compression_name(r)))
            #else:
            #    sout("warn", "libarchive can not handle this file: %s" % libarchive.archive_error_string(r))
            libarchive.archive_read_free(r)
            return ret

        def unpack(cls, destination, **parameters):
            success = True
            r = libarchive.archive_read_new()
            libarchive.archive_read_support_format_all(r)
            libarchive.archive_read_support_filter_all(r)
            libarchive.archive_read_open_filename(r, cls.file, 1024)
            oldcwd = os.getcwd()
            os.chdir(destination)
            h = ctypes.c_void_p()
            while libarchive.archive_read_next_header(r, ctypes.pointer(h)) == 0:
                print libarchive.archive_entry_pathname(h)
                if libarchive.archive_read_extract(r, h, 0) != 0:
                    sout("fail", "libarchive returned with an error: %s" % libarchive.archive_error_string(r))
                    success = False
                    break
            os.chdir(oldcwd)
            libarchive.archive_read_free(r)
            return success
# }}}
# }}}
# Search for a matching file type {{{
def guessType(file):
    """
        Search all subclasses of filetype for the right one to handle file.
        Output a warning if one is found but the required program is not
        installed.
    """
    gfile = file.lower()
    fileTypeGuess = False
    if os.system("which file >/dev/null") == 0:
        fileTypeGuess = os.popen("file -NLbzi %s" % se(file)).read().strip()
    for cls in sorted(filetype.__subclasses__(), key=lambda i: i.PRIORITY):
        if ((cls.canHandle(file) or cls.extensionMatches(gfile) or (fileTypeGuess and cls.mimeTypeMatches(fileTypeGuess)))
            and cls.toolsAvailable()):
            return cls
    return False
# }}}
# Fancy output {{{
if not sys.stdout.isatty():
    def sout(level, text):
        """
            Output text prepended with ' * '
        """
        print ' * ', text
else:
    def sout(level, text):
        """
            Output text prepended with ' * ' in green (good),
            yellow (warn) or red (bad)
        """
        if level == 'good':
            color = 32
        elif level == 'warn':
            color = 33
        elif level == 'bad':
            color = 31
        elif level == 'question':
            color = 34
        else:
            color = 39
        print "\x1b[%d;01m*\x1b[39;00m %s" % (color, text)
def askYesNo(question):
    while True:
        print " ", question, " [yn]: ",
        answer = sys.stdin.readline().strip()
        if answer in ("y", "n"): break
    return answer == "y"
if not (sys.stdout.isatty() and sys.stdin.isatty()):
    def listQuery(choices, default = 0):
        """
            Use the default answer in a selection list
            (For non-interactive terminals)
        """
        if type(choices) is list:
            choices = dict(zip(range(len(choices)), choices))
        assert(default in choices)
        sout("good", "Assuming %s" % choices[default])
        return default
else:
    def listQuery(choices, default = 0):
        """
            Ask the user for something
        """
        if type(choices) is list:
            choices = dict(zip(range(len(choices)), choices))
        assert(default in choices)
        if len(choices) == 1:
            return default
        fieldLength = len(str(max(choices.keys())))
        fmtString = " %%0%dd. %%s" % fieldLength
        selectIn = choices.keys()
        for number, key in zip(range(len(selectIn)), selectIn):
            print fmtString % (number + 1, choices[key])
        while True:
            try:
                userChoice = raw_input(">> ").strip()
                if not userChoice:
                    choice = default
                else:
                    choice = selectIn[int(userChoice) - 1]
                break
            except KeyboardInterrupt:
                return False
            except:
                pass
        return choice
# }}}
# Recursive unpack {{{
def recursive_unpack(destination, temporary_archive):
    sout("good", "Recursive unpack of %s" % temporary_archive)
    file_type = guessType(temporary_archive)
    if file_type:
        temporary_target = "%s/target" % destination
        os.mkdir(temporary_target)
        if file_type(temporary_archive).unpack(temporary_target):
            os.unlink(temporary_archive)
            created = os.listdir(temporary_target)
            if "target" in created:
                alt_name = "target"
                while alt_name in created:
                    alt_name += "_"
                os.system("mv %s %s" % (se(temporary_target), se("%s/%s" % (destination, alt_name))))
                temporary_target = "%s/%s" % (destination, alt_name)
            os.system("mv %s/* %s" % (se(temporary_target), se(destination)))
            os.rmdir(temporary_target)
            return True
    return False
# }}}
# Help message {{{
def helpMessage():
    """
        Display help message
    """
    print ("unpack.py\n"
        " generic unpacking utility like unp\n"
        " Copyright(c) 2008, Phillip Berndt\n\n"
        " Options:\n"
        "  -d    Specify target directory\n"
        "  -p    Pass parameters on to the extractor\n"
        "  -i    Ignore return value of extractor, assume it worked\n"
        "  -s    Keep extracted files in a subdirectory\n"
        "  -r    Remove archives after unpacking\n\n"
        " Supported formats:\n"
        "  zip, jar, rar, tar, 7z, bz2, gz, cab, lha, arj, rpm, cpio, run (Makeself), deb, MIME mails, squashfs, iso, lzma"
        "\n\n")
# }}}
# Filename hooks {{{
fileNameHooks = []
def downloadHook(fkt):
    """
        Annotation tag which declares a function as a hook for file
        stuff (like downloading URLs etc)
    """
    global hooks
    fileNameHooks.append(fkt)
    return fkt

@downloadHook
def hookURLDownload(file):
    """
        Download URLs using (p)axel/curl/wget or python urllib
    """
    if re.match(r"^[a-zA-Z]+:\/\/.+", file):
        sout(
"good", "URL detected. Downloading")
        targetFile = os.path.basename(file)
        if os.access(targetFile, os.F_OK):
            counter = 0
            while os.access("%d-%s" % (counter, targetFile), os.F_OK):
                counter += 1
            targetFile = "%d-%s" % (counter, targetFile)
        success = False
        if os.system("which paxel >/dev/null") == 0:
            paxelRet = os.system("paxel -o %s %s" % (se(targetFile), se(file)))
            success = paxelRet == 0
        elif os.system("which axel >/dev/null") == 0:
            axelRet = os.system("axel -a -o %s %s" % (se(targetFile), se(file)))
            # Axel sometimes fails when there are too many redirects
            if axelRet == 2:
                # User aborted
                success = False
            success = axelRet == 0
        if success:
            # Axel did it
            pass
        elif os.system("which curl >/dev/null") == 0:
            success = os.system("curl -Lo %s %s" % (se(targetFile), se(file))) == 0
        elif not os.system("which wget >/dev/null") == 0:
            success = os.system("wget -O %s %s" % (se(targetFile), se(file))) == 0
        else:
            try:
                import urllib
                urllib.urlretrieve(file, targetFile)
                success = True
            except:
                success = False
        if not success:
            sout("bad", "Failed to download from URL.")
            if os.access(targetFile, os.W_OK):
                os.unlink(targetFile)
            return False
        else:
            return targetFile
    return file

@downloadHook
def hookSCPDownload(file):
    """
        Download using SCP
    """
    if re.match(r"^[a-zA-Z]+:.+", file):
        sout(
"good", "Foreign host detected. Using SCP to download file")
        targetFile = os.path.basename(file)
        if os.access(targetFile, os.F_OK):
            counter = 0
            while os.access("%d-%s" % (counter, targetFile), os.F_OK):
                counter += 1
            targetFile = "%d-%s" % (counter, targetFile)
        if os.system("scp %s %s" % (se(file), se(targetFile))) != 0:
            sout("bad", "Failed to download from foreign host.")
            if os.access(targetFile, os.W_OK):
                os.unlink(targetFile)
            return False
        else:
            return targetFile
    return file

# }}}

if __name__ == '__main__':
    # Main program {{{
    # Parse command line
    try:
        (options, files) = getopt.getopt(sys.argv[1:], "hrsid:p:")
        options = dict(options)
    except:
        options = { '-h': '' }
   
    passOn = ""
    if "-p" in options:
        passOn = options["-p"]
   
    if '-h' in options or len(files) == 0:
        helpMessage()
        sys.exit(0)
   
    # cwd must be writeable
    if not os.access("./", os.W_OK):
        sout("bad", "Failed to unpack: ./ must be writeable.")
        sys.exit(1)

    removeFiles = set()

    # Unpack all files
    for fileNr in range(len(files)):
        file = files[fileNr]
        if len(files) > 1:
            if fileNr > 0:
                print
            sout("good", "[%d/%d] Processing %s" % (fileNr + 1, len(files), file))

        # Apply hooks
        removeBecauseIsHooked = False
        abortBecauseOfHook = False
        for hook in fileNameHooks:
            suggestion = hook(file)
            if suggestion == False:
                abortBecauseOfHook = True
                break
            if suggestion != file:
                file = suggestion
                removeBecauseIsHooked = True
                break
        if abortBecauseOfHook:
            continue

        # Download URLs
        if re.match(r"^[a-zA-Z]+:\/\/.+", file):
            sout(
"good", "URL detected. Downloading")
            targetFile = os.path.basename(file)
            if os.access(targetFile, os.F_OK):
                counter = 0
                while os.access("%d-%s" % (counter, targetFile)):
                    counter += 1
                targetFile = "%d-%s" % (counter, targetFile)
            if os.system("curl -Lo %s %s" % (se(targetFile), se(file))) != 0:
                sout("bad", "Failed to download from URL.")
                os.unlink(targetFile)
                continue
            else:
                file = targetFile
                removeBecauseIsHooked = True

        # Create unpacker
        if not os.access(file, os.R_OK):
            sout("bad", "Failed to open file")
            continue
        arctype = guessType(file)
        if not arctype:
            sout("bad", "Failed to find matching unpacking instructions")
            continue
        unpacker = arctype(file)

        # Create temporary unpacking directory
        destinationMatcher = re.search("^(.+)%s$" % arctype.TYPE_DEF, os.path.basename(file))
        if destinationMatcher:
            destination = destinationMatcher.group(1)
        else:
            destination = re.sub(r"\.[^\.]+$", "", os.path.basename(file))
        if os.access(destination, os.F_OK):
            counter = 0
            while os.access("%s~%d" % (destination, counter), os.F_OK):
                counter += 1
            destination = "%s~%d" % (destination, counter)
        os.mkdir(destination)

        # Unpack to that directory
        if not unpacker.unpack(destination, parameters=passOn):
            if "-i" in options:
                sout("warn", "I think unpacking failed. But I'll ignore that..")
            else:
                sout("bad", "Unpacking failed")
                os.system("rm -rf %s" % se(destination))
                continue

        # Check for subdirectories
        if '-d' in options:
            # Move everything into the specified directory, no matter what
            target = os.path.abspath(options["-d"])
            if os.system("cp -r %s/* %s/" % (se(destination), se(target))) != 0:
                sout("warn", "Failed to move files to destination. Will not delete the temporary directory")
            else:
                os.system("rm -rf %s" % se(destination))
                destination = options["-d"]
        elif not '-s' in options:
            filesInDestination = os.listdir(destination)
            if len(filesInDestination) == 0:
                sout("warn", "Archive was empty")
                os.system("rmdir %s" % se(destination))
                continue
            if len(filter(lambda x: x[0] == ".", filesInDestination)) > 0:
                sout("warn", "Archive contains dotfiles. Will not move contents to ./")
            else:
                #directoriesInDestination = filter(lambda d: os.path.isdir(os.path.join(destination, d)), filesInDestination)
                if len(filesInDestination) == 1:
                    # To solve problems where <archive>.ext contains only one subdirectory called <archive>
                    if destination in filesInDestination:
                        oldDestination = destination
                        counter = 0
                        while os.access("%s~%d" % (destination, counter), os.F_OK):
                            counter += 1
                        destination = "%s~%d" % (destination, counter)
                        os.system("mv %s %s" % (se(oldDestination), se(destination)))
                    # Check whether some unpacked files already exist in ./
                    if len(filter(lambda d: os.access(d, os.F_OK), filesInDestination)) == 0:
                        # If not, move files to ./
                        os.system("mv %s/* ./" % se(destination))
                        os.system("rmdir %s" % se(destination))
                        destination = "./"
                    sout("good", "Extracted %s" % filesInDestination[0])
        # Remove archive
        if '-r' in options:
            if not os.access(file, os.W_OK):
                sout("warn", "-r supplied but no write access to archive")
            else:
                sout("good", "Removing archive")
                os.unlink(file)
                # Search for multifile archive parts
                pattern = re.sub("[0-9]+", "[0-9]+", re.escape(os.path.basename(file)))
                pattern = re.compile(re.sub("(rar|zip)$", lambda x: x.group(1)[0] + ".{2}", pattern))
                for similarFile in filter(lambda x: pattern.match(os.path.basename(x)), glob(os.path.dirname(file) + "*")):
                    removeFiles.add(similarFile)
            removeBecauseIsHooked = False
        if removeBecauseIsHooked:
            os.unlink(file)

        # Done
        sout("good", "Done unpacking to %s" % destination)

    # Remove files which look as if they are part of a removed multi-file archive
    if removeFiles:
        sout("good", "These files look associated to removed files:")
        removeFiles = filter(lambda f: os.access(f, os.F_OK) and not os.path.isdir(f), removeFiles)
        for sfile in removeFiles:
                print "  -", sfile
        if askYesNo("Delete them?"):
            for sfile in removeFiles:
                os.unlink(sfile)
    # }}}

Download

Dateiname
unpack.py
Größe
31.82kb