#============================================================================
#Name : fileutils.py
#Part of : Helium
#Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
#All rights reserved.
#This component and the accompanying materials are made available
#under the terms of the License "Eclipse Public License v1.0"
#which accompanies this distribution, and is available
#at the URL "http://www.eclipse.org/legal/epl-v10.html".
#
#Initial Contributors:
#Nokia Corporation - initial contribution.
#
#Contributors:
#
#Description:
#===============================================================================
"""
File manipulation related functionalities:
* Filescanner
* rmtree (fixed version)
* move (fixed version)
"""
import codecs
import locale
import logging
import os
import re
import sys
import shutil
import hashlib
import subprocess
import string
import pathaddition.match
import stat
if os.name == 'nt':
import win32api
LOGGER = logging.getLogger('fileutils')
LOGGER_LOCK = logging.getLogger('fileutils.lock')
#LOGGER.addHandler(logging.FileHandler('default.log'))
#logging.basicConfig(level=logging.DEBUG)
#LOGGER.setLevel(logging.DEBUG)
class AbstractScanner(object):
""" This class implements all the required infrastructure for filescanning. """
def __init__(self):
""" Initialization. """
self.includes = []
self.excludes = []
self.includes_files = []
self.excludes_files = []
self.selectors = []
self.filetypes = []
def add_include(self, include):
""" Adds an include path to the scanner. """
if include.endswith('/') or include.endswith('\\'):
include = include + '**'
self.includes.append(include)
def add_exclude(self, exclude):
""" Adds an exclude path to the scanner. """
if exclude.endswith('/') or exclude.endswith('\\'):
exclude = exclude + '**'
self.excludes.append(exclude)
def add_exclude_file(self, exclude):
""" Adds an exclude file to the scanner. """
self.excludes_files.append(exclude)
def add_selector(self, selector):
""" Add selector to the scanner. """
self.selectors.append(selector)
def add_filetype(self, filetype):
""" Adds a filetype selection to the scanner. """
self.filetypes.append(filetype)
def is_included(self, path):
""" Returns if path is included by the scanner. """
LOGGER.debug("is_included: path = " + path)
if path.replace('\\', '/') in self.includes_files or path in self.includes_files:
return True
for inc in self.includes:
if self.match(path, inc):
LOGGER.debug("Included: " + path + " by " + inc)
return True
return False
def is_excluded(self, path):
""" Returns if path is excluded by the scanner. """
LOGGER.debug("is_excluded: path = " + path)
if path.replace('\\', '/') in self.excludes_files or path in self.excludes_files:
return True
for ex in self.excludes:
if self.match(path, ex):
LOGGER.debug("Excluded: " + path + " by " + ex)
return True
return False
def is_selected(self, path):
""" Returns if path is selected by all selectors in the scanner. """
LOGGER.debug("is_selected: path = " + path)
for selector in self.selectors:
if not selector.is_selected(path):
return False
LOGGER.debug("Selected: " + path)
return True
def is_filetype(self, path):
""" Test if a file matches one filetype. """
if len(self.filetypes) == 0:
return True
LOGGER.debug("is_filetype: path = " + path)
for filetype in self.filetypes:
if self.match(path, filetype):
LOGGER.debug("Filetype: " + path + " by " + filetype)
return True
return False
def match(self, filename, pattern):
""" Is filename matching pattern? """
return pathaddition.match.ant_match(filename, pattern, casesensitive=(os.sep != '\\'))
def test_path(self, root, relpath):
""" Test if a path matches filetype, include, exclude, and selection process."""
return self.is_filetype(relpath) and self.is_included(relpath) \
and not self.is_excluded(relpath) and \
self.is_selected(os.path.join(root, relpath))
def __str__(self):
""" Returns a string representing this instance. """
content = []
for inc in self.includes:
content.append('include:' + os.path.normpath(inc))
for ex in self.excludes:
content.append('exclude:' + os.path.normpath(ex))
return ';'.join(content)
def __repr__(self):
""" Returns a string representing this instance. """
return self.__str__()
def scan(self):
""" Abstract method which much be overriden to implement the scanning process. """
raise Exception("scan method must be overriden")
class FileScanner(AbstractScanner):
"""Scans the filesystem for files that match the selection paths.
The scanner is configured with a root directory. Any number of include
and exclude paths can be added. The scan() method is a generator that
returns matching files one at a time when called as an iterator.
This is a revisited implementation of the filescanner. It now relies on
the module pathaddition.match that implements a Ant-like regular expression matcher.
Rules:
- Includes and excludes should not start with *
- Includes and excludes should not have wildcard searches ending with ** (e.g. wildcard**)
Supported includes and excludes:
- filename.txt
- filename.*
- dir/
- dir/*
- dir/**
"""
def __init__(self, root_dir):
""" Initialization. """
AbstractScanner.__init__(self)
self.root_dir = os.path.normpath(root_dir)
if not self.root_dir.endswith(os.sep):
self.root_dir = self.root_dir + os.sep
# Add 1 so the final path separator is removed
#self.root_dirLength = len(self.root_dir) + 1
def scan(self):
""" Scans the files required to zip"""
#paths_cache = []
excludescopy = self.excludes[:]
for f_file in excludescopy:
if os.path.exists(os.path.normpath(os.path.join(self.root_dir, f_file))):
self.excludes_files.append(f_file)
self.excludes.remove(f_file)
includescopy = self.includes[:]
for f_file in includescopy:
if os.path.exists(os.path.normpath(os.path.join(self.root_dir, f_file))):
self.includes_files.append(f_file)
self.includes.remove(f_file)
LOGGER.debug('Scanning sub-root directories')
for root_dir in self.find_subroots():
for dirpath, subdirs, files in os.walk(unicode(root_dir)):
# Let's save the len before it's getting modified.
subdirsLen = len(subdirs)
subroot = dirpath[len(self.root_dir):]
dirs_to_remove = []
for subdir in subdirs:
if self.is_excluded(os.path.join(subroot, subdir)):
dirs_to_remove.append(subdir)
for dir_remove in dirs_to_remove:
subdirs.remove(dir_remove)
LOGGER.debug('Scanning directory: ' + dirpath)
for file_ in files:
path = os.path.join(subroot, file_)
if self.is_filetype(path) and self.is_included(path) and \
self.is_selected(os.path.join(dirpath, file_)) and not self.is_excluded(path):
ret_path = os.path.join(dirpath, file_)
yield ret_path
LOGGER.debug('Checking for empty directory: ' + dirpath)
# Check for including empty directories
if self.is_included(subroot) and not self.is_excluded(subroot):
if len(files) == 0 and subdirsLen == 0:
LOGGER.debug('Including empty dir: ' + dirpath)
yield dirpath
def find_subroots(self):
"""Finds all the subdirectory roots based on the include paths.
Often large archive operations define a number of archives from the root
of the drive. Walking the tree from the root is very time-consuming, so
selecting more specific subdirectory roots improves performance.
"""
def splitpath(path):
""" Returns the splitted path"""
return path.split(os.sep)
root_dirs = []
# Look for includes that start with wildcards.
subdirs_not_usable = False
for inc in self.includes + self.includes_files:
first_path_segment = splitpath(os.path.normpath(inc))[0]
if first_path_segment.find('*') != -1:
subdirs_not_usable = True
# Parse all includes for sub-roots
if not subdirs_not_usable:
for inc in self.includes + self.includes_files:
include = None
LOGGER.debug("===> inc %s" % inc)
contains_globs = False
for pathcomp in splitpath(os.path.normpath(inc)):
if pathcomp.find('*') != -1:
contains_globs = True
break
else:
if include == None:
include = pathcomp
else:
include = os.path.join(include, pathcomp)
if not contains_globs:
include = os.path.dirname(include)
LOGGER.debug("include %s" % include)
if include != None:
root_dir = os.path.normpath(os.path.join(self.root_dir, include))
is_new_root = True
for root in root_dirs[:]:
if destinsrc(root, root_dir):
LOGGER.debug("root contains include, skip it")
is_new_root = False
break
if destinsrc(root_dir, root):
LOGGER.debug("include contains root, so remove root")
root_dirs.remove(root)
if is_new_root:
root_dirs.append(root_dir)
if len(root_dirs) == 0:
root_dirs = [os.path.normpath(self.root_dir)]
LOGGER.debug('Roots = ' + str(root_dirs))
return root_dirs
def __str__(self):
return os.path.normpath(self.root_dir) + ';' + AbstractScanner.__str__(self)
def __repr__(self):
return self.__str__()
def move(src, dst):
"""Recursively move a file or directory to another location.
If the destination is on our current filesystem, then simply use
rename. Otherwise, copy src to the dst and then remove src.
A lot more could be done here... A look at a mv.c shows a lot of
the issues this implementation glosses over.
"""
try:
os.rename(src, dst)
except OSError:
if os.path.isdir(src):
if destinsrc(src, dst):
raise Exception, "Cannot move a directory '%s' into itself '%s'." % (src, dst)
shutil.copytree(src, dst, symlinks=True)
rmtree(src)
else:
shutil.copy2(src, dst)
os.unlink(src)
def rmtree(rootdir):
""" Catch shutil.rmtree failures on Windows when files are read-only. Thanks Google!"""
if sys.platform == 'win32':
rootdir = os.path.normpath(rootdir)
if not os.path.isabs(rootdir):
rootdir = os.path.join(os.path.abspath('.'), rootdir)
if not rootdir.startswith('\\\\'):
rootdir = u"\\\\?\\" + rootdir
def cb_handle_error(fcn, path, excinfo):
""" Error handler, removing readonly and deleting the file. """
os.chmod(path, 0666)
if os.path.isdir(path):
rmdir(path)
elif os.path.isfile(path):
remove(path)
else:
fcn(path)
if 'java' in sys.platform:
import java.io
import org.apache.commons.io.FileUtils
f_file = java.io.File(rootdir)
org.apache.commons.io.FileUtils.deleteDirectory(f_file)
else:
return shutil.rmtree(rootdir, onerror=cb_handle_error)
def destinsrc(src, dst):
""" Fixed version of destinscr, that doesn't match dst with same root name."""
if os.sep == '\\':
src = src.lower()
dst = dst.lower()
src = os.path.abspath(src)
dst = os.path.abspath(dst)
if not src.endswith(os.path.sep):
src += os.path.sep
if not dst.endswith(os.path.sep):
dst += os.path.sep
return dst.startswith(src)
def which(executable):
""" Search for executable in the PATH."""
pathlist = os.environ['PATH'].split(os.pathsep)
pathexts = ['']
if os.sep == '\\':
pathexts = os.environ['PATHEXT'].split(os.pathsep)
for folder in pathlist:
for pathext in pathexts:
exename = executable
if os.sep == '\\' and not exename.lower().endswith(pathext.lower()):
exename = exename + pathext
filename = os.path.join(folder, exename)
try:
status = os.stat(filename)
except os.error:
continue
# Check if the path is a regular file
if stat.S_ISREG(status[stat.ST_MODE]):
mode = stat.S_IMODE(status[stat.ST_MODE])
if mode & 0111:
return os.path.normpath(filename)
return None
def read_policy_content(filename):
""" Read the policy number from the policy file.
strict allows to activate the new policy scanning.
"""
value = None
error = ""
try:
LOGGER.debug('Opening policy file: ' + filename)
policy_data = load_policy_content(filename)
match = re.match(r'^((?:\d+)|(?:0842[0-9a-zA-Z]{3}))\s*$', policy_data, re.M|re.DOTALL)
if match != None:
value = match.group(1)
else:
error = "Content of '%s' doesn't match r'^\d+|0842[0-9a-zA-Z]{3}\s*$'." % filename
except IOError, exc:
error = str(exc)
if value is not None:
return value
# worse case....
raise IOError(error)
def load_policy_content(filename):
""" Testing policy content loading. """
data = ''
try:
fileh = codecs.open(filename, 'r', 'ascii')
data = fileh.read()
except ValueError:
raise IOError("Error loading '%s' as an ASCII file." % filename)
finally:
fileh.close()
return data
ENCODING_MATRIX = {
codecs.BOM_UTF8: 'utf_8',
codecs.BOM_UTF16: 'utf_16',
codecs.BOM_UTF16_BE: 'utf_16_be',
codecs.BOM_UTF16_LE: 'utf_16_le',
}
def guess_encoding(data):
"""Given a byte string, guess the encoding.
First it tries for UTF8/UTF16 BOM.
Next it tries the standard 'UTF8', 'ISO-8859-1', and 'cp1252' encodings,
Plus several gathered from locale information.
The calling program *must* first call locale.setlocale(locale.LC_ALL, '')
If successful it returns (decoded_unicode, successful_encoding)
If unsuccessful it raises a ``UnicodeError``.
This was taken from http://www.voidspace.org.uk/python/articles/guessing_encoding.shtml
"""
for bom, enc in ENCODING_MATRIX.items():
if data.startswith(bom):
return data.decode(enc), enc
encodings = ['ascii', 'UTF-8']
successful_encoding = None
try:
encodings.append(locale.getlocale()[1])
except (AttributeError, IndexError):
pass
try:
encodings.append(locale.getdefaultlocale()[1])
except (AttributeError, IndexError):
pass
# latin-1
encodings.append('ISO8859-1')
encodings.append('cp1252')
for enc in encodings:
if not enc:
continue
try:
decoded = unicode(data, enc)
successful_encoding = enc
break
except (UnicodeError, LookupError):
pass
if successful_encoding is None:
raise UnicodeError('Unable to decode input data. Tried the'
' following encodings: %s.' %
', '.join([repr(enc) for enc in encodings if enc]))
else:
if successful_encoding == 'ascii':
# our default ascii encoding
successful_encoding = 'ISO8859-1'
return (decoded, successful_encoding)
def getmd5(fullpath, chunk_size=2**16):
""" returns the md5 value"""
for trial in range(3):
try:
file_handle = open(fullpath, "rb")
md5 = hashlib.md5()
while 1:
chunk = file_handle.read(chunk_size)
if not chunk:
break
md5.update(chunk)
file_handle.close()
except IOError, exc:
LOGGER.warning("Error happened on %d trial: %s" % (trial, str(exc)))
return md5.hexdigest()
def read_symbian_policy_content(filename):
""" Read the policy category from the policy file. """
value = None
error = ""
try:
LOGGER.debug('Opening symbian policy file: ' + filename)
try:
fileh = codecs.open(filename, 'r', 'ascii')
except ValueError:
raise IOError("Error loading '%s' as an ASCII file." % filename)
for line in fileh:
match = re.match(r'^Category\s+([A-Z])\s*$', line, re.M|re.DOTALL)
if match != None:
value = match.group(1)
fileh.close()
return value
fileh.close()
if match == None:
error = "Content of '%s' doesn't match r'^Category\s+([A-Z])\s*$'." % filename
except IOError, exc:
error = str(exc)
if value is not None:
return value
# worse case....
raise IOError(error)
class LockFailedException(Exception):
""" This class is used to indicate the failure in obtaining a Lock. """
pass
if os.name == 'nt':
import win32file
import win32con
import winerror
import time
import win32netcon
import win32wnet
class Lock:
""" This object implement file locking for windows. """
def __init__(self, filename):
LOGGER_LOCK.debug("__init__")
self._filename = filename
self.f_desc = None
def lock(self, wait=False):
"""lock the file"""
LOGGER_LOCK.debug("lock")
# Open the file
if self.f_desc == None:
self.f_desc = open(self._filename, "w+")
# pylint: disable=W0212
wfd = win32file._get_osfhandle(self.f_desc.fileno())
if not wait:
try:
win32file.LockFile(wfd, 0, 0, 0xffff, 0)
except:
raise LockFailedException()
else:
while True:
try:
win32file.LockFile(wfd, 0, 0, 0xffff, 0)
break
except win32file.error, exc:
if exc[0] != winerror.ERROR_LOCK_VIOLATION:
raise exc
LOGGER_LOCK.debug("waiting")
time.sleep(1)
def unlock(self):
"""unlock the file"""
LOGGER_LOCK.debug("unlock")
if self.f_desc == None:
LOGGER_LOCK.debug("already unlocked")
return
# pylint: disable=W0212
wfd = win32file._get_osfhandle(self.f_desc.fileno())
try:
# pylint: disable=E1101
win32file.UnlockFile(wfd, 0 , 0, 0xffff, 0)
self.f_desc.close()
self.f_desc = None
except win32file.error, exc:
if exc[0] != 158:
raise
def __del__(self):
LOGGER_LOCK.debug("__del__")
self.unlock()
def rmdir(path):
""" Catch os.rmdir failures on Windows when path is too long (more than 256 chars)."""
path = win32api.GetShortPathName(path)
win32file.RemoveDirectory(path)
def remove(filename):
""" Catch os.rmdir failures on Windows when path is too long (more than 256 chars)."""
filename = win32api.GetShortPathName(filename)
filename = filename.lstrip("\\\\?\\")
os.remove(filename)
def mount(drive, unc, username=None, password=None, persistent=False):
""" Windows helper function to map a network drive. """
flags = 0
if persistent:
flags = win32netcon.CONNECT_UPDATE_PROFILE
win32wnet.WNetAddConnection2(win32netcon.RESOURCETYPE_DISK, drive, unc, None, username, password, flags)
def umount(drive):
""" Windows helper function to map a network drive. """
drive_type = win32file.GetDriveType(drive)
if drive_type == win32con.DRIVE_REMOTE:
win32wnet.WNetCancelConnection2(drive, win32netcon.CONNECT_UPDATE_PROFILE, 1)
else:
raise Exception("%s couldn't be umount." % drive)
else:
def rmdir(path):
"""remove directory"""
return os.rmdir(path)
def remove(path):
"""remove the files and folders"""
return os.remove(path)
class Lock:
""" This class represents a dummy lock """
def __init__(self, filename):
pass
def lock(self, wait=False):
"""lock file - do nothing """
pass
def unlock(self):
"""un lock file -do nothing """
pass
if os.sep == '\\':
def get_next_free_drive():
""" Return the first free drive found else it raise an exception. """
if os.name == 'nt':
drive_labels = sorted(list(set(string.ascii_uppercase) - set(win32api.GetLogicalDriveStrings())), reverse=True)
if len(drive_labels) != 0 :
return drive_labels[0] + ":"
raise OSError("No free drive left.")
if 'java' in sys.platform:
import java.io
used = []
for _xx in java.io.File.listRoots():
used.append(str(_xx).replace(':\\', ''))
drive_labels = sorted(list(set(string.ascii_uppercase) - set(used)), reverse=True)
if len(drive_labels) != 0 :
return drive_labels[0] + ":"
raise OSError("No free drive left.")
def subst(drive, path):
""" Substing path as a drive. """
path = os.path.normpath(path)
p_subst = subprocess.Popen("subst %s %s" % (drive, path), shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
errmsg = p_subst.communicate()[0]
if p_subst.returncode != 0:
raise Exception("Error substing '%s' under '%s': %s" % (path, drive, errmsg))
def unsubst(drive):
""" Unsubsting the drive. """
p_subst = subprocess.Popen("subst /D %s" % (drive), shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
errmsg = p_subst.communicate()[0]
if p_subst.returncode != 0:
raise Exception("Error unsubsting '%s': %s" % (drive, errmsg))
def getSubstedDrives():
"""get substituted drive"""
driveInformation = {}
subStedDriveList = []
p_subst = subprocess.Popen("subst", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subStedDriveList = re.split('\\n', p_subst.communicate()[0])
del subStedDriveList[len(subStedDriveList)-1]
for path in subStedDriveList:
subStedDrivePath = []
if(re.search(r'UNC', path) is not None):
subStedDrivePath = re.split('=>', path)
(drive_to_unsubst, _) = os.path.splitdrive(os.path.normpath(subStedDrivePath[0]))
uncPath = re.sub('UNC', r'\\', subStedDrivePath[1].strip())
if(uncPath != subStedDrivePath[1].strip()):
driveInformation[drive_to_unsubst] = uncPath
else:
subStedDrivePath = re.split('=>', path)
(drive_to_unsubst, _) = os.path.splitdrive(os.path.normpath(subStedDrivePath[0]))
driveInformation[drive_to_unsubst] = os.path.normpath(subStedDrivePath[1].strip())
return driveInformation
def touch(srcdir):
"""
Recursively touches all the files in the source path mentioned.
It does not touch the directories.
"""
srcnames = os.listdir(srcdir)
for name in srcnames:
srcfname = os.path.join(srcdir, name)
if os.path.isdir(srcfname):
touch(srcfname)
else:
if os.path.exists(srcfname):
os.utime(srcfname, None)