Ein Script zum Entpacken diverser Dateiformate.
# vim:noexpandtab:ts=8:sw=8
# Unpacking script
# Copyright (c) 2008-2014, Phillip Berndt
# No documentation required, I guess. Just run `unpack -h` for
# command line stuff. This script is very simple. If you extend
# it by adding support for more file types or such, please contact
# me and I will update this version of the script according to your
# changes :)
# Licenced under GPL.
# Todo:
# - Support for shar archives (without actually running the script)
# - ISO and MSI support without 7z
# Changelog:
# - 29.04.2018
# Fix debian packages (control.tar.* support)
# - 06.10.2016
# Support paxel in addition to axel
# - 04.10.2016
# Add zstandard support
# - 16.12.2014
# Fixed RPM support (Correctly skip header, LZMA archive through recursive unpack invocation)
# - 27.02.2014
# If axel fails, retry download with more reliable tool
# - 18.02.2014
# squashfs support
# - 24.01.2014
# Added libarchive support
# Fixed .gz file extension
# Fixed a bug where the script tried to delete directories which look related to an archive
# - 14.05.2013
# Fixed xz support
# - 12.05.2013
# Replaced generator by more flexible approach using environment variables
# Added bz2 and gz support
# - 08.05.2013
# Fixed CPIO and RPM support
# - 17.04.2013
# Support for .tar.xz in deb archives
# - 12.12.2012
# Support for .tar.xz
# - 06.12.2012
# 7z command name updated
# - 07.11.2012
# Added options for target specification and passing-through to extractors
# - 21.10.2012
# fixed .zip mime-type
# - 03.07.2012
# 7z was renamed
# - 04.04.2011
# Improved detection of related files
# - 17.09.2010
# Added DEB file compression type detection
# Rewrote shell-escaping
# - 04.05.2010
# Added LZMA support
# - 08.09.2009
# Removing other file parts as well
# - 21.07.2009
# Fixed a bug in filetype autodetection
# - 26.02.2008
# Added multiple unpack options for DEB archives
# - 22.02.2008
# Changed &> to > due to incompatibility to debian's sh
# - 27.01.2008
# ISO/MSI support via p7z
# ZIP uses Python module "zip" if unzip is not found
# - 15.01.2008
# Added support for MIME email messages
# Improved class design
# - 14.01.2008
# Fixed a forget-to-kill-debugging-stuff bug
# RPM uses cpio directly and no longer rpm2tbz
# Makeself support improved
# - 12.01.2008
# Added support for Makeself and deb
# Uses `file` to guess file type now
# Neater output when extracting multiple files
# Fixed a bug in RPM code
# - 09.01.2008
# Added support for more file formats and SCP transmission
# Fixed subdirectory stuff
# Improved program design
# If curl is not found the downloader will use wget instead or
# fallback to python urllib
# - 08.01.2008
# Wrote initial version of the script. Enjoy!
from glob import glob
import getopt
import os
import os.path
import re
import struct
import subprocess
import sys
# Functions {{{
def se(parameter):
return '"' + str(parameter).replace('"', r'\"') + '"'
# }}}
# Define file types {{{
class filetype(object): # {{{
Default filetype base class. Derive all download classes from
this one. Only direct subclasses will be taken into account!
PRIORITY = 0 # Lower is better
# Regex to match against the end of the filename
TYPE_DEF = "" # Regex to match against the end of the filename
TYPE_MIME_DEF = "" # Regex to match against the output of "file -NLbzi"
# Program required on the host to unpack this format
__slots__ = [ "file" ]
def __init__(cls, file):
Initialize the class object. The file-parameter contains
the file to be unpacked
cls.file = file
def canHandle(cls, file):
Check if this class can handle the file based on its actual
contents or similar. Any tests the other methods can't handle
go here.
return False
def extensionMatches(cls, file):
Checks if this class is the correct one to unpack <file>
by comparing the file extension
if cls.TYPE_DEF:
return re.search(cls.TYPE_DEF + "$", file)
return False
def mimeTypeMatches(cls, mimeType):
Checks if this class is the correct one to unpack <file>
by comparing the mime type (eg "file -NLbzi <file>" output
return re.search("^" + cls.TYPE_MIME_DEF, mimeType)
return False
def toolsAvailable(cls):
Checks whether the tools required by this class are avaolable
if cls.TYPE_REQUIRES and os.system("which %s >/dev/null" % cls.TYPE_REQUIRES) != 0:
if cls.TYPE_REQUIRES.find(" ") > -1:
sout("warn", "Could unpack this file, but you don't have one of the required packages: %s" % cls.TYPE_REQUIRES.replace(" ", ", "))
sout("warn", "Could unpack this file, but you don't have %s installed" % cls.TYPE_REQUIRES)
return False
return True
def unpack(cls, destination, **parameters):
Called when a file needs to be unpacked. The parameter
destination specifies into which directory the archive
should be unpacked
raise Exception("You need to override unpack")
# }}}
def defaultTypeGenerator(regexExtension, regexMime, program, command): # {{{
Generate a default implementation of the filetype-class.
TYPE_DEF and TYPE_REQUIRES are the first two parameters. Depending on
the value of the last parameter, unpack is implemented as
command % (file, destination folder)
or with the tuple reversed
class tmpcls(filetype):
TYPE_DEF = regexExtension
TYPE_MIME_DEF = regexMime
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
params = parameters["parameters"]
params = ""
os.environ["PARAMS"] = params
os.environ["FILE"] = cls.file
os.environ["DEST"] = destination
return os.system(command) == 0
return tmpcls
# }}}
# Various file types
class ZIPFile(filetype): # {{{
TYPE_DEF = r"\.(?:zip|jar)"
TYPE_MIME_DEF = "application/(?:x-)?zip"
__slots__ = [ "USE_CMDLINE", "file" ]
def toolsAvailable(cls):
if os.system("which unzip >/dev/null") == 0:
cls.USE_CMDLINE = True
cls.USE_CMDLINE = False
import zipfile
sout("warn", "You neither have the zipfile python extension nor unzip installed. Can't unpack ZIP archives.")
return False
return True
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
params = parameters["parameters"]
params = ""
return os.system("unzip %s %s -d %s" % (params, se(cls.file), se(destination))) == 0
import zipfile
fileObject = zipfile.ZipFile(cls.file)
for node in fileObject.namelist():
nodeTitle = node
if node.find("..") != -1:
sout("warn", "Found suspicious filename in archive, ignoring '..': " + node)
node = node.replace("..", ".")
if node[0] == "/":
node = "./" + node
print node
targetDirectory = os.path.join(destination, os.path.dirname(node))
if not os.path.isdir(targetDirectory):
if nodeTitle[-1] != "/":
targetFile = open(os.path.join(destination, node), "w")
del fileObject
return True
sout("bad", "Failed to extract ZIP archive")
return False
# }}}
RARFile = defaultTypeGenerator(r"\.rar", "application/x-rar", "unrar", "unrar x $PARAMS \"$FILE\" \"$DEST\"")
TARLZMAFile = defaultTypeGenerator(r"\.tar\.lzma", "application/x-tar.+application/x-lzma", "tar", "tar x $PARAMS --lzma -C \"$DEST\" -f \"$FILE\"")
TBZFile = defaultTypeGenerator(r"\.(tbz|tar\.bz2?)", "application/x-tar.+application/x-bzip2", "tar", "tar xj `echo $PARAMS` -C \"$DEST\" -f \"$FILE\"")
TGZFile = defaultTypeGenerator(r"\.(tgz|tar\.g?z)", "application/x-tar.+application/x-gzip", "tar", "tar xz `echo $PARMAS` -C \"$DEST\" -f \"$FILE\"")
TXZFile = defaultTypeGenerator(r"\.tar\.xz", "application/x-tar.+application/x-xz", "tar", "tar x `echo $PARAMS` --xz -C \"$DEST\" -f \"$FILE\"")
TARFile = defaultTypeGenerator(r"\.tar", "application/x-tar", "tar", "tar x `echo $PARAMS` -C \"$DEST\" -f \"$FILE\"")
TARFZstile = defaultTypeGenerator(r"\.tar\.zst", "", "tar zstdcat", "zstdcat \"$FILE\" | tar x `echo $PARAMS` -C \"$DEST\"")
CABFile = defaultTypeGenerator(r"\.cab", "^$", "cabextract", "cabextract `echo $PARAMS` -d \"$DEST\" \"$FILE\"")
LHAFile = defaultTypeGenerator(r"\.lha", "application/x-lha", "lha", "lha x `echo $PARAMS` -w=\"$DEST\" \"$FILE\"")
ARJFile = defaultTypeGenerator(r"\.arj", "application/x-arj", "arj", "arj x `echo $PARAMS` \"$FILE\" \"$DEST\"")
CPIOfile = defaultTypeGenerator(r"\.cpio", "application/x-cpio", "cpio", "cat \"$FILE\" | (cd \"$DEST\" && cpio -i --no-absolute-filenames --make-directories `echo $PARAMS`)")
BZ2File = defaultTypeGenerator(r"\.bz2", "application/x-bzip2", "bzip2", "cat \"$FILE\" | (cd \"$DEST\"; bzip2 -d `echo $PARAMS` > `basename $FILE .bz2`)")
GZFile = defaultTypeGenerator(r"\.gz", "application/x-gzip", "gzip", "cat \"$FILE\" | (cd \"$DEST\"; gzip -d `echo $PARAMS` > `basename $FILE .gz`)")
for sevenZExecutable in ("7z", "7za", "7zr"):
if os.system("which " + sevenZExecutable + " >/dev/null 2>&1") == 0:
SEVENZFile = defaultTypeGenerator(r"\.7z", "^$", sevenZExecutable, sevenZExecutable + " x `echo $PARAMS` -o\"$DEST\" \"$FILE\"")
MSIFile = defaultTypeGenerator(r"\.msi", "", sevenZExecutable, sevenZExecutable + " x `echo $PARAMS` -o\"$DEST\" \"$FILE\"")
ISOFile = defaultTypeGenerator(r"\.iso", "application/x-iso9660", sevenZExecutable, sevenZExecutable + " x `echo $PARAMS` -o\"$DEST\" \"$FILE\"")
class DEBFile(filetype): # {{{
TYPE_DEF = r"\.deb"
TYPE_MIME_DEF = "application/x-debian-package"
TYPE_REQUIRES = "ar tar"
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
sout("warn", "DEB extraction does not support additional parameters")
sout("question", "This is a debian package. Please choose:")
answer = listQuery([ "Unpack package", "Unpack package and control files",
"Unpack ar-archive" ], 1)
if type(answer) is bool:
return False
dataFile = os.popen("ar t '%s' | grep -E '^data.tar'" % cls.file.replace("'", r"\'")).read().strip()
compressionFlag = ""
if dataFile[-4:] == "lzma": compressionFlag = "--lzma"
elif dataFile[-3:] == "bz2": compressionFlag = "-j"
elif dataFile[-2:] == "gz": compressionFlag = "-z"
elif dataFile[-2:] == "xz": compressionFlag = "--xz"
controlFile = os.popen("ar t '%s' | grep -E '^control.tar'" % cls.file.replace("'", r"\'")).read().strip()
controlCompressionFlag = ""
if controlFile[-4:] == "lzma": controlCompressionFlag = "--lzma"
elif controlFile[-3:] == "bz2": controlCompressionFlag = "-j"
elif controlFile[-2:] == "gz": controlCompressionFlag = "-z"
elif controlFile[-2:] == "xz": controlCompressionFlag = "--xz"
if answer == 0:
# Unpack data archive
return os.system("ar p %s %s | tar x %s -C %s" % (se(cls.file), dataFile, compressionFlag, se(destination))) == 0
elif answer == 1:
# Unpack data & control archive
return os.system(
("(ar p %s %s | tar x %s -C %s) && (mkdir %s/DEBIAN && "
"ar p %s %s | tar x %s -C %s/DEBIAN)") %
(se(cls.file), dataFile, compressionFlag, se(destination), se(destination), se(cls.file), se(controlFile), se(controlCompressionFlag),
se(destination))) == 0
elif answer == 2:
# Unpack the ar-archive only
for file in os.popen("ar t %r" % cls.file).readlines():
destDir = os.path.join(destination, os.path.dirname(file.strip()))
if not os.access(destDir, os.F_OK):
assert(os.system("ar p %s %s > %s/%s" %
(se(cls.file), se(file.strip()), se(destination),
se(file.strip()))) == 0)
return True
return False
return False
# }}}
class MAKESELFFile(filetype): # {{{
TYPE_DEF = r"\.run"
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
params = parameters["parameters"]
params = ""
# Find first newline followed by a non-print character in the file
# This should be the TAR archive
if os.system("grep -q Makeself %s" % se(cls.file)) != 0:
sout("bad", "This does not look like a Makeself archive")
return False
archive = open(cls.file, "r")
scriptData = ""
while True:
while True:
byte = archive.read(1)
scriptData += byte
if byte == "\n":
if not byte:
byte = archive.read(1)
scriptData += byte
if not byte:
sout("bad", "Binary data stream not found.")
return False
if byte != "\t" and byte != "\r" and byte != "\n" and (ord(byte) < 32 or ord(byte) > 126):
archive.seek(-1, 1)
compressionType = re.search('COMPRESS="([^"]+)"', scriptData)
if compressionType == None:
compressionType = re.search(r"Compression: (\S+)", scriptData)
if compressionType == None:
sout("bad", "Failed to extract compression type")
return False
compressionType = compressionType.group(1)
if compressionType == "Unix" or compressionType == "gzip":
flags = "z"
elif compressionType == "bzip2":
flags = "j"
sout("bad", "This compression type is not supported right now.")
# Unpack
targetPipe = subprocess.Popen("tar %sx %s -C %r" % (flags, params, destination), shell=True, stdin=subprocess.PIPE)
while True:
data = archive.read(1024)
if not data:
return os.WEXITSTATUS(targetPipe.wait()) == 0
# }}}
class RPMFile(filetype): # {{{
TYPE_DEF = r"\.rpm"
TYPE_MIME_DEF = "application/x-rpm"
TYPE_REQUIRES = "cpio dd"
def unpack(cls, destination, **parameters):
if "parameters" in parameters and parameters["parameters"]:
sout("warn", "RPM extraction does not support additional parameters")
# We parse the RPM ourselves
# See http://www.rpm.org/max-rpm/s1-rpm-file-format-rpm-file-format.html for RPM file structure
sout("good", "Searching rpm for archive file offset")
archiveHead = open(cls.file).read(1024**2 * 2)
# Check if this realls is a RPM file
if archiveHead[:4] != "\xed\xab\xee\xdb":
sout("bad", "This does not look like an RPM file")
return False
# Skip Lead
offset = 96
# Skip the Header structures
header_count = 0
while archiveHead[offset:offset + 3] == "\x8e\xad\xe8" and header_count < 2:
header_count += 1
offset += 3 + 1 + 4
entries, length = struct.unpack(">II", archiveHead[offset:offset + 8])
if header_count == 1:
# Pad to multiplier of 8; undocumented, but the Perl module does this as well
length += 8 - (length % 8) if length % 8 != 0 else 0
offset += 8 + 16 * entries + length
# Unpack
if offset > -1:
sout("good", "Found archive offset")
if archiveHead[offset:offset+2] == "\x1f\x8b":
return os.system("dd ibs=%d skip=1 if=%s 2>/dev/null | gzip -d | (cd %s && cpio -i --no-absolute-filenames --make-directories)" % (offset, se(cls.file), se(destination))) == 0
elif archiveHead[offset:offset+6] == "\xFD\x37\x7A\x58\x5A\x00":
return os.system("dd ibs=%d skip=1 if=%s 2>/dev/null | xz -d | (cd %s && cpio -i --no-absolute-filenames --make-directories)" % (offset, se(cls.file), se(destination))) == 0
sout("warn", "Unknown archive format: %r. Falling back to recursive unpack." % (archiveHead[offset:offset+4]))
temporary_archive = "%s/package.compressed" % destination
os.system("dd ibs=%d skip=1 if=%s 2>/dev/null > %s" % (offset, se(cls.file), se(temporary_archive)))
return recursive_unpack(destination, temporary_archive)
sout("bad", "Failed to find archive offset")
# }}}
class MIMEFile(filetype): # {{{
TYPE_DEF = r"\.mime"
TYPE_MIME_DEF = "message/rfc822"
def toolsAvailable(cls):
import email.Parser
return True
sout("warn", "You don't have email.Parser python extension installed. Can't unpack MIME archives.")
return False
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
sout("warn", "MIME extraction does not support additional parameters")
import email.Parser
mailData = email.Parser.Parser().parse(open(cls.file))
if not mailData.is_multipart():
sout("warn", "This is no multipart message. E.g. there are no attachments to be extracted.")
return False
for mailData in mailData.walk():
fileName = mailData.get_filename(False)
if fileName:
targetFile = open("%s/%s" % (destination, os.path.basename(fileName)), "w")
return True
sout("warn", "Looked like MIME mail but parsing the archive failed")
return False
# }}}
class BZ2File(filetype): # {{{
TYPE_DEF = r"(?<!\.tar)\.bz2"
TYPE_MIME_DEF = "application/x-bzip2"
__slots__ = [ "file" ]
def toolsAvailable(cls):
if not os.system("which bzip2 >/dev/null") == 0:
sout("warn", "You need bzip2 to decompress this file")
return False
return True
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
params = parameters["parameters"]
params = ""
return os.system("cp %s %s; cd %s; bzip2 %s -d %s" % (se(cls.file), se(destination), se(destination), params, se(cls.file))) == 0
# }}}
class LZMAFile(filetype): # {{{
TYPE_DEF = r"\.lzma"
__slots__ = [ "file" ]
def toolsAvailable(cls):
if not os.system("which lzma >/dev/null") == 0:
sout("warn", "You need lzma to decompress this file")
return False
return True
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
params = parameters["parameters"]
params = ""
fnlzma = cls.file[:-5]
return os.system("cat %s | (cd %s; lzma %s -d > %s)" % (se(cls.file), se(destination), params, se(fnlzma))) == 0
# }}}
class SquashFSFile(filetype): # {{{
TYPE_DEF = r"\.squashfs"
__slots__ = [ "file" ]
def canHandle(cls, file):
fileLine = os.popen("file %s" % se(file)).read()
if "Squashfs filesystem" in fileLine:
return True
return False
def toolsAvailable(cls):
if not os.system("which unsquashfs >/dev/null") == 0:
sout("warn", "You need unsquashfs to decompress this file")
return False
return True
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
params = parameters["parameters"]
params = ""
return os.system("unsquashfs %s -f -d %s %s" % (params, se(destination), se(cls.file))) == 0
# }}}
class ZStandard(filetype):# {{{
TYPE_DEF = r"\.zst"
def canHandle(cls, file):
if open(file).read(4) == "\x28\xB5\x2F\xFD":
return True
return False
def unpack(cls, destination, **parameters):
if "parameters" in parameters:
params = parameters["parameters"]
params = ""
destination = os.path.join(destination, os.path.splitext(cls.file)[0])
return os.system("zstd %s -d -o %s %s" % (params, se(destination), se(cls.file))) == 0
# }}}
# libarchive support {{{
libarchive = False
import ctypes
libarchive = ctypes.CDLL("libarchive.so.13")
libarchive.archive_read_new.restype = ctypes.c_void_p
libarchive.archive_compression_name.restype = ctypes.c_char_p
libarchive.archive_format_name.restype = ctypes.c_char_p
libarchive.archive_error_string.restype = ctypes.c_char_p
libarchive.archive_entry_pathname.restype = ctypes.c_char_p
if libarchive:
class LibArchiveHandler(filetype):
def canHandle(cls, file):
ret = False
r = libarchive.archive_read_new()
if libarchive.archive_read_open_filename(r, file, 1024) == 0:
ret = True
sout("good", "libarchive can handle this file. File type is %s/%s" % (libarchive.archive_format_name(r), libarchive.archive_compression_name(r)))
# sout("warn", "libarchive can not handle this file: %s" % libarchive.archive_error_string(r))
return ret
def unpack(cls, destination, **parameters):
success = True
r = libarchive.archive_read_new()
libarchive.archive_read_open_filename(r, cls.file, 1024)
oldcwd = os.getcwd()
h = ctypes.c_void_p()
while libarchive.archive_read_next_header(r, ctypes.pointer(h)) == 0:
print libarchive.archive_entry_pathname(h)
if libarchive.archive_read_extract(r, h, 0) != 0:
sout("fail", "libarchive returned with an error: %s" % libarchive.archive_error_string(r))
success = False
return success
# }}}
# }}}
# Search for a matching file type {{{
def guessType(file):
Search all subclasses of filetype for the right one to handle file.
Output a warning if one is found but the required program is not
gfile = file.lower()
fileTypeGuess = False
if os.system("which file >/dev/null") == 0:
fileTypeGuess = os.popen("file -NLbzi %s" % se(file)).read().strip()
for cls in sorted(filetype.__subclasses__(), key=lambda i: i.PRIORITY):
if ((cls.canHandle(file) or cls.extensionMatches(gfile) or (fileTypeGuess and cls.mimeTypeMatches(fileTypeGuess)))
and cls.toolsAvailable()):
return cls
return False
# }}}
# Fancy output {{{
if not sys.stdout.isatty():
def sout(level, text):
Output text prepended with ' * '
print ' * ', text
def sout(level, text):
Output text prepended with ' * ' in green (good),
yellow (warn) or red (bad)
if level == 'good':
color = 32
elif level == 'warn':
color = 33
elif level == 'bad':
color = 31
elif level == 'question':
color = 34
color = 39
print "\x1b[%d;01m*\x1b[39;00m %s" % (color, text)
def askYesNo(question):
while True:
print " ", question, " [yn]: ",
answer = sys.stdin.readline().strip()
if answer in ("y", "n"): break
return answer == "y"
if not (sys.stdout.isatty() and sys.stdin.isatty()):
def listQuery(choices, default = 0):
Use the default answer in a selection list
(For non-interactive terminals)
if type(choices) is list:
choices = dict(zip(range(len(choices)), choices))
assert(default in choices)
sout("good", "Assuming %s" % choices[default])
return default
def listQuery(choices, default = 0):
Ask the user for something
if type(choices) is list:
choices = dict(zip(range(len(choices)), choices))
assert(default in choices)
if len(choices) == 1:
return default
fieldLength = len(str(max(choices.keys())))
fmtString = " %%0%dd. %%s" % fieldLength
selectIn = choices.keys()
for number, key in zip(range(len(selectIn)), selectIn):
print fmtString % (number + 1, choices[key])
while True:
userChoice = raw_input(">> ").strip()
if not userChoice:
choice = default
choice = selectIn[int(userChoice) - 1]
except KeyboardInterrupt:
return False
return choice
# }}}
# Recursive unpack {{{
def recursive_unpack(destination, temporary_archive):
sout("good", "Recursive unpack of %s" % temporary_archive)
file_type = guessType(temporary_archive)
if file_type:
temporary_target = "%s/target" % destination
if file_type(temporary_archive).unpack(temporary_target):
created = os.listdir(temporary_target)
if "target" in created:
alt_name = "target"
while alt_name in created:
alt_name += "_"
os.system("mv %s %s" % (se(temporary_target), se("%s/%s" % (destination, alt_name))))
temporary_target = "%s/%s" % (destination, alt_name)
os.system("mv %s/* %s" % (se(temporary_target), se(destination)))
return True
return False
# }}}
# Help message {{{
def helpMessage():
Display help message
print ("unpack.py\n"
" generic unpacking utility like unp\n"
" Copyright(c) 2008, Phillip Berndt\n\n"
" Options:\n"
" -d Specify target directory\n"
" -p Pass parameters on to the extractor\n"
" -i Ignore return value of extractor, assume it worked\n"
" -s Keep extracted files in a subdirectory\n"
" -r Remove archives after unpacking\n\n"
" Supported formats:\n"
" zip, jar, rar, tar, 7z, bz2, gz, cab, lha, arj, rpm, cpio, run (Makeself), deb, MIME mails, squashfs, iso, lzma"
# }}}
# Filename hooks {{{
fileNameHooks = []
def downloadHook(fkt):
Annotation tag which declares a function as a hook for file
stuff (like downloading URLs etc)
global hooks
return fkt
def hookURLDownload(file):
Download URLs using (p)axel/curl/wget or python urllib
if re.match(r"^[a-zA-Z]+:\/\/.+", file):
sout("good", "URL detected. Downloading")
targetFile = os.path.basename(file)
if os.access(targetFile, os.F_OK):
counter = 0
while os.access("%d-%s" % (counter, targetFile), os.F_OK):
counter += 1
targetFile = "%d-%s" % (counter, targetFile)
success = False
if os.system("which paxel >/dev/null") == 0:
paxelRet = os.system("paxel -o %s %s" % (se(targetFile), se(file)))
success = paxelRet == 0
elif os.system("which axel >/dev/null") == 0:
axelRet = os.system("axel -a -o %s %s" % (se(targetFile), se(file)))
# Axel sometimes fails when there are too many redirects
if axelRet == 2:
# User aborted
success = False
success = axelRet == 0
if success:
# Axel did it
elif os.system("which curl >/dev/null") == 0:
success = os.system("curl -Lo %s %s" % (se(targetFile), se(file))) == 0
elif not os.system("which wget >/dev/null") == 0:
success = os.system("wget -O %s %s" % (se(targetFile), se(file))) == 0
import urllib
urllib.urlretrieve(file, targetFile)
success = True
success = False
if not success:
sout("bad", "Failed to download from URL.")
if os.access(targetFile, os.W_OK):
return False
return targetFile
return file
def hookSCPDownload(file):
Download using SCP
if re.match(r"^[a-zA-Z]+:.+", file):
sout("good", "Foreign host detected. Using SCP to download file")
targetFile = os.path.basename(file)
if os.access(targetFile, os.F_OK):
counter = 0
while os.access("%d-%s" % (counter, targetFile), os.F_OK):
counter += 1
targetFile = "%d-%s" % (counter, targetFile)
if os.system("scp %s %s" % (se(file), se(targetFile))) != 0:
sout("bad", "Failed to download from foreign host.")
if os.access(targetFile, os.W_OK):
return False
return targetFile
return file
# }}}
if __name__ == '__main__':
# Main program {{{
# Parse command line
(options, files) = getopt.getopt(sys.argv[1:], "hrsid:p:")
options = dict(options)
options = { '-h': '' }
passOn = ""
if "-p" in options:
passOn = options["-p"]
if '-h' in options or len(files) == 0:
# cwd must be writeable
if not os.access("./", os.W_OK):
sout("bad", "Failed to unpack: ./ must be writeable.")
removeFiles = set()
# Unpack all files
for fileNr in range(len(files)):
file = files[fileNr]
if len(files) > 1:
if fileNr > 0:
sout("good", "[%d/%d] Processing %s" % (fileNr + 1, len(files), file))
# Apply hooks
removeBecauseIsHooked = False
abortBecauseOfHook = False
for hook in fileNameHooks:
suggestion = hook(file)
if suggestion == False:
abortBecauseOfHook = True
if suggestion != file:
file = suggestion
removeBecauseIsHooked = True
if abortBecauseOfHook:
# Download URLs
if re.match(r"^[a-zA-Z]+:\/\/.+", file):
sout("good", "URL detected. Downloading")
targetFile = os.path.basename(file)
if os.access(targetFile, os.F_OK):
counter = 0
while os.access("%d-%s" % (counter, targetFile)):
counter += 1
targetFile = "%d-%s" % (counter, targetFile)
if os.system("curl -Lo %s %s" % (se(targetFile), se(file))) != 0:
sout("bad", "Failed to download from URL.")
file = targetFile
removeBecauseIsHooked = True
# Create unpacker
if not os.access(file, os.R_OK):
sout("bad", "Failed to open file")
arctype = guessType(file)
if not arctype:
sout("bad", "Failed to find matching unpacking instructions")
unpacker = arctype(file)
# Create temporary unpacking directory
destinationMatcher = re.search("^(.+)%s$" % arctype.TYPE_DEF, os.path.basename(file))
if destinationMatcher:
destination = destinationMatcher.group(1)
destination = re.sub(r"\.[^\.]+$", "", os.path.basename(file))
if os.access(destination, os.F_OK):
counter = 0
while os.access("%s~%d" % (destination, counter), os.F_OK):
counter += 1
destination = "%s~%d" % (destination, counter)
# Unpack to that directory
if not unpacker.unpack(destination, parameters=passOn):
if "-i" in options:
sout("warn", "I think unpacking failed. But I'll ignore that..")
sout("bad", "Unpacking failed")
os.system("rm -rf %s" % se(destination))
# Check for subdirectories
if '-d' in options:
# Move everything into the specified directory, no matter what
target = os.path.abspath(options["-d"])
if os.system("cp -r %s/* %s/" % (se(destination), se(target))) != 0:
sout("warn", "Failed to move files to destination. Will not delete the temporary directory")
os.system("rm -rf %s" % se(destination))
destination = options["-d"]
elif not '-s' in options:
filesInDestination = os.listdir(destination)
if len(filesInDestination) == 0:
sout("warn", "Archive was empty")
os.system("rmdir %s" % se(destination))
if len(filter(lambda x: x[0] == ".", filesInDestination)) > 0:
sout("warn", "Archive contains dotfiles. Will not move contents to ./")
#directoriesInDestination = filter(lambda d: os.path.isdir(os.path.join(destination, d)), filesInDestination)
if len(filesInDestination) == 1:
# To solve problems where <archive>.ext contains only one subdirectory called <archive>
if destination in filesInDestination:
oldDestination = destination
counter = 0
while os.access("%s~%d" % (destination, counter), os.F_OK):
counter += 1
destination = "%s~%d" % (destination, counter)
os.system("mv %s %s" % (se(oldDestination), se(destination)))
# Check whether some unpacked files already exist in ./
if len(filter(lambda d: os.access(d, os.F_OK), filesInDestination)) == 0:
# If not, move files to ./
os.system("mv %s/* ./" % se(destination))
os.system("rmdir %s" % se(destination))
destination = "./"
sout("good", "Extracted %s" % filesInDestination[0])
# Remove archive
if '-r' in options:
if not os.access(file, os.W_OK):
sout("warn", "-r supplied but no write access to archive")
sout("good", "Removing archive")
# Search for multifile archive parts
pattern = re.sub("[0-9]+", "[0-9]+", re.escape(os.path.basename(file)))
pattern = re.compile(re.sub("(rar|zip)$", lambda x: x.group(1)[0] + ".{2}", pattern))
for similarFile in filter(lambda x: pattern.match(os.path.basename(x)), glob(os.path.dirname(file) + "*")):
removeBecauseIsHooked = False
if removeBecauseIsHooked:
# Done
sout("good", "Done unpacking to %s" % destination)
# Remove files which look as if they are part of a removed multi-file archive
if removeFiles:
sout("good", "These files look associated to removed files:")
removeFiles = filter(lambda f: os.access(f, os.F_OK) and not os.path.isdir(f), removeFiles)
for sfile in removeFiles:
print " -", sfile
if askYesNo("Delete them?"):
for sfile in removeFiles:
# }}}