Package translate :: Package storage :: Module bundleprojstore
[hide private]
[frames] | no frames]

Source Code for Module translate.storage.bundleprojstore

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # Copyright 2010 Zuza Software Foundation 
  5  # 
  6  # This file is part of the Translate Toolkit. 
  7  # 
  8  # This program is free software; you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public License as published by 
 10  # the Free Software Foundation; either version 2 of the License, or 
 11  # (at your option) any later version. 
 12  # 
 13  # This program is distributed in the hope that it will be useful, 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  # GNU General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU General Public License 
 19  # along with this program; if not, see <http://www.gnu.org/licenses/>. 
 20   
 21  import os 
 22  import shutil 
 23  import tempfile 
 24  from zipfile import ZipFile 
 25   
 26  from translate.storage.projstore import * 
 27   
 28  __all__ = ['BundleProjectStore', 'InvalidBundleError'] 
29 30 31 -class InvalidBundleError(Exception):
32 pass
33
34 35 -class BundleProjectStore(ProjectStore):
36 """Represents a translate project bundle (zip archive).""" 37 38 # INITIALIZERS #
39 - def __init__(self, fname):
40 super(BundleProjectStore, self).__init__() 41 self._tempfiles = {} 42 if fname and os.path.isfile(fname): 43 self.load(fname) 44 else: 45 self.zip = ZipFile(fname, 'w') 46 self.save() 47 self.zip.close() 48 self.zip = ZipFile(fname, 'a')
49 50 51 # CLASS METHODS # 52 @classmethod
53 - def from_project(cls, proj, fname=None):
54 if fname is None: 55 fname = 'bundle.zip' 56 57 bundle = BundleProjectStore(fname) 58 for fn in proj.sourcefiles: 59 bundle.append_sourcefile(proj.get_file(fn)) 60 for fn in proj.transfiles: 61 bundle.append_transfile(proj.get_file(fn)) 62 for fn in proj.targetfiles: 63 bundle.append_targetfile(proj.get_file(fn)) 64 bundle.settings = proj.settings.copy() 65 bundle.save() 66 return bundle
67 68 69 # METHODS #
70 - def append_file(self, afile, fname, ftype='trans', delete_orig=False):
71 """Append the given file to the project with the given filename, marked 72 to be of type C{ftype} ('src', 'trans', 'tgt'). 73 74 @param delete_orig: If C{True}, as set by 75 L{project.convert_forward()}, C{afile} is 76 deleted after appending, if possible. 77 NOTE: For this implementation, the appended file will be deleted 78 from disk if C{delete_orig} is C{True}.""" 79 if fname and fname in self.zip.namelist(): 80 raise ValueError("File already in bundle archive: %s" % (fname)) 81 if not fname and isinstance(afile, basestring) and afile in self.zip.namelist(): 82 raise ValueError("File already in bundle archive: %s" % (afile)) 83 84 afile, fname = super(BundleProjectStore, self).append_file(afile, fname, ftype) 85 self._zip_add(fname, afile) 86 87 if delete_orig and hasattr(afile, 'name') and afile.name not in self._tempfiles: 88 try: 89 os.unlink(afile.name) 90 except Exception: 91 pass 92 93 return self.get_file(fname), fname
94
95 - def remove_file(self, fname, ftype=None):
96 """Remove the file with the given project name from the project.""" 97 super(BundleProjectStore, self).remove_file(fname, ftype) 98 self._zip_delete([fname]) 99 tempfiles = [tmpf for tmpf, prjf in self._tempfiles.iteritems() if prjf == fname] 100 if tempfiles: 101 for tmpf in tempfiles: 102 try: 103 os.unlink(tmpf) 104 except Exception: 105 pass 106 del self._tempfiles[tmpf]
107
108 - def close(self):
109 super(BundleProjectStore, self).close() 110 self.cleanup() 111 self.zip.close()
112
113 - def cleanup(self):
114 """Clean up our mess: remove temporary files.""" 115 for tempfname in self._tempfiles: 116 if os.path.isfile(tempfname): 117 os.unlink(tempfname) 118 self._tempfiles = {}
119
120 - def get_file(self, fname):
121 """Retrieve a project file (source, translation or target file) from the 122 project archive.""" 123 retfile = None 124 if fname in self._files or fname in self.zip.namelist(): 125 # Check if the file has not already been extracted to a temp file 126 tempfname = [tfn for tfn in self._tempfiles if self._tempfiles[tfn] == fname] 127 if tempfname and os.path.isfile(tempfname[0]): 128 tempfname = tempfname[0] 129 else: 130 tempfname = '' 131 if not tempfname: 132 # Extract the file to a temporary file 133 zfile = self.zip.open(fname) 134 tempfname = os.path.split(fname)[-1] 135 tempfd, tempfname = tempfile.mkstemp(suffix='_' + tempfname) 136 os.close(tempfd) 137 open(tempfname, 'w').write(zfile.read()) 138 retfile = open(tempfname) 139 self._tempfiles[tempfname] = fname 140 141 if not retfile: 142 raise FileNotInProjectError(fname) 143 return retfile
144
145 - def get_proj_filename(self, realfname):
146 """Try and find a project file name for the given real file name.""" 147 try: 148 fname = super(BundleProjectStore, self).get_proj_filename(realfname) 149 except ValueError, ve: 150 fname = None 151 if fname: 152 return fname 153 if realfname in self._tempfiles: 154 return self._tempfiles[realfname] 155 raise ValueError('Real file not in project store: %s' % (realfname))
156
157 - def load(self, zipname):
158 """Load the bundle project from the zip file of the given name.""" 159 self.zip = ZipFile(zipname, mode='a') 160 self._load_settings() 161 162 append_section = { 163 'sources': self._sourcefiles.append, 164 'targets': self._targetfiles.append, 165 'transfiles': self._transfiles.append, 166 } 167 for section in ('sources', 'targets', 'transfiles'): 168 if section in self.settings: 169 for fname in self.settings[section]: 170 append_section[section](fname) 171 self._files[fname] = None
172
173 - def save(self, filename=None):
174 """Save all project files to the bundle zip file.""" 175 self._update_from_tempfiles() 176 177 if filename: 178 newzip = ZipFile(filename, 'w') 179 else: 180 newzip = self._create_temp_zipfile() 181 182 # Write project file for the new zip bundle 183 newzip.writestr('project.xtp', self._generate_settings()) 184 # Copy project files from project to the new zip file 185 project_files = self._sourcefiles + self._transfiles + self._targetfiles 186 for fname in project_files: 187 newzip.writestr(fname, self.get_file(fname).read()) 188 # Copy any extra (non-project) files from the current zip 189 for fname in self.zip.namelist(): 190 if fname in project_files or fname == 'project.xtp': 191 continue 192 newzip.writestr(fname, self.zip.read(fname)) 193 194 self._replace_project_zip(newzip)
195
196 - def update_file(self, pfname, infile):
197 """Updates the file with the given project file name with the contents 198 of C{infile}. 199 200 @returns: the results from L{self.append_file}.""" 201 if pfname not in self._files: 202 raise FileNotInProjectError(pfname) 203 204 if pfname not in self.zip.namelist(): 205 return super(BundleProjectStore, self).update_file(pfname, infile) 206 207 self._zip_delete([pfname]) 208 self._zip_add(pfname, infile)
209
210 - def _load_settings(self):
211 """Grab the project.xtp file from the zip file and load it.""" 212 if 'project.xtp' not in self.zip.namelist(): 213 raise InvalidBundleError('Not a translate project bundle') 214 super(BundleProjectStore, self)._load_settings(self.zip.open('project.xtp').read())
215
216 - def _create_temp_zipfile(self):
217 """Create a new zip file with a temporary file name (with mode 'w').""" 218 newzipfd, newzipfname = tempfile.mkstemp(prefix='translate_bundle', suffix='.zip') 219 os.close(newzipfd) 220 return ZipFile(newzipfname, 'w')
221
222 - def _replace_project_zip(self, zfile):
223 """Replace the currently used zip file (C{self.zip}) with the given zip 224 file. Basically, C{os.rename(zfile.filename, self.zip.filename)}.""" 225 if not zfile.fp.closed: 226 zfile.close() 227 if not self.zip.fp.closed: 228 self.zip.close() 229 shutil.move(zfile.filename, self.zip.filename) 230 self.zip = ZipFile(self.zip.filename, mode='a')
231
232 - def _update_from_tempfiles(self):
233 """Update project files from temporary files.""" 234 for tempfname in self._tempfiles: 235 tmp = open(tempfname) 236 self.update_file(self._tempfiles[tempfname], tmp) 237 if not tmp.closed: 238 tmp.close()
239
240 - def _zip_add(self, pfname, infile):
241 """Add the contents of C{infile} to the zip with file name C{pfname}.""" 242 if hasattr(infile, 'seek'): 243 infile.seek(0) 244 self.zip.writestr(pfname, infile.read()) 245 self._files[pfname] = None # Clear the cached file object to force the
246 # file to be read from the zip file. 247
248 - def _zip_delete(self, fnames):
249 """Delete the files with the given names from the zip file (C{self.zip}).""" 250 # Sanity checking 251 if not isinstance(fnames, (list, tuple)): 252 raise ValueError("fnames must be list or tuple: %s" % (fnames)) 253 if not self.zip: 254 raise ValueError("No zip file to work on") 255 zippedfiles = self.zip.namelist() 256 for fn in fnames: 257 if fn not in zippedfiles: 258 raise KeyError("File not in zip archive: %s" % (fn)) 259 260 newzip = self._create_temp_zipfile() 261 newzip.writestr('project.xtp', self._generate_settings()) 262 263 for fname in zippedfiles: 264 # Copy all files from self.zip that are not project.xtp (already 265 # in the new zip file) or in fnames (they are to be removed, after 266 # all. 267 if fname in fnames or fname == 'project.xtp': 268 continue 269 newzip.writestr(fname, self.zip.read(fname)) 270 271 self._replace_project_zip(newzip)
272