1 | """ |
---|
2 | TestCmd.py: a testing framework for commands and scripts. |
---|
3 | |
---|
4 | The TestCmd module provides a framework for portable automated testing |
---|
5 | of executable commands and scripts (in any language, not just Python), |
---|
6 | especially commands and scripts that require file system interaction. |
---|
7 | |
---|
8 | In addition to running tests and evaluating conditions, the TestCmd module |
---|
9 | manages and cleans up one or more temporary workspace directories, and |
---|
10 | provides methods for creating files and directories in those workspace |
---|
11 | directories from in-line data, here-documents), allowing tests to be |
---|
12 | completely self-contained. |
---|
13 | |
---|
14 | A TestCmd environment object is created via the usual invocation: |
---|
15 | |
---|
16 | test = TestCmd() |
---|
17 | |
---|
18 | The TestCmd module provides pass_test(), fail_test(), and no_result() |
---|
19 | unbound methods that report test results for use with the Aegis change |
---|
20 | management system. These methods terminate the test immediately, |
---|
21 | reporting PASSED, FAILED, or NO RESULT respectively, and exiting with |
---|
22 | status 0 (success), 1 or 2 respectively. This allows for a distinction |
---|
23 | between an actual failed test and a test that could not be properly |
---|
24 | evaluated because of an external condition (such as a full file system |
---|
25 | or incorrect permissions). |
---|
26 | """ |
---|
27 | |
---|
28 | # Copyright 2000 Steven Knight |
---|
29 | # This module is free software, and you may redistribute it and/or modify |
---|
30 | # it under the same terms as Python itself, so long as this copyright message |
---|
31 | # and disclaimer are retained in their original form. |
---|
32 | # |
---|
33 | # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, |
---|
34 | # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF |
---|
35 | # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH |
---|
36 | # DAMAGE. |
---|
37 | # |
---|
38 | # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT |
---|
39 | # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A |
---|
40 | # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, |
---|
41 | # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, |
---|
42 | # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. |
---|
43 | |
---|
44 | # Copyright 2002-2003 Vladimir Prus. |
---|
45 | # Copyright 2002-2003 Dave Abrahams. |
---|
46 | # Copyright 2006 Rene Rivera. |
---|
47 | # Distributed under the Boost Software License, Version 1.0. |
---|
48 | # (See accompanying file LICENSE_1_0.txt or copy at |
---|
49 | # http://www.boost.org/LICENSE_1_0.txt) |
---|
50 | |
---|
51 | |
---|
52 | from string import join, split |
---|
53 | |
---|
54 | __author__ = "Steven Knight <knight@baldmt.com>" |
---|
55 | __revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software" |
---|
56 | __version__ = "0.02" |
---|
57 | |
---|
58 | from types import * |
---|
59 | |
---|
60 | import os |
---|
61 | import os.path |
---|
62 | import popen2 |
---|
63 | import re |
---|
64 | import shutil |
---|
65 | import stat |
---|
66 | import sys |
---|
67 | import tempfile |
---|
68 | import traceback |
---|
69 | |
---|
70 | tempfile.template = 'testcmd.' |
---|
71 | |
---|
72 | _Cleanup = [] |
---|
73 | |
---|
74 | def _clean(): |
---|
75 | global _Cleanup |
---|
76 | list = _Cleanup[:] |
---|
77 | _Cleanup = [] |
---|
78 | list.reverse() |
---|
79 | for test in list: |
---|
80 | test.cleanup() |
---|
81 | |
---|
82 | sys.exitfunc = _clean |
---|
83 | |
---|
84 | def _caller(tblist, skip): |
---|
85 | string = "" |
---|
86 | arr = [] |
---|
87 | for file, line, name, text in tblist: |
---|
88 | if file[-10:] == "TestCmd.py": |
---|
89 | break |
---|
90 | arr = [(file, line, name, text)] + arr |
---|
91 | atfrom = "at" |
---|
92 | for file, line, name, text in arr[skip:]: |
---|
93 | if name == "?": |
---|
94 | name = "" |
---|
95 | else: |
---|
96 | name = " (" + name + ")" |
---|
97 | string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) |
---|
98 | atfrom = "\tfrom" |
---|
99 | return string |
---|
100 | |
---|
101 | def fail_test(self = None, condition = 1, function = None, skip = 0): |
---|
102 | """Cause the test to fail. |
---|
103 | |
---|
104 | By default, the fail_test() method reports that the test FAILED |
---|
105 | and exits with a status of 1. If a condition argument is supplied, |
---|
106 | the test fails only if the condition is true. |
---|
107 | """ |
---|
108 | if not condition: |
---|
109 | return |
---|
110 | if not function is None: |
---|
111 | function() |
---|
112 | of = "" |
---|
113 | desc = "" |
---|
114 | sep = " " |
---|
115 | if not self is None: |
---|
116 | if self.program: |
---|
117 | of = " of " + join(self.program, " ") |
---|
118 | sep = "\n\t" |
---|
119 | if self.description: |
---|
120 | desc = " [" + self.description + "]" |
---|
121 | sep = "\n\t" |
---|
122 | |
---|
123 | at = _caller(traceback.extract_stack(), skip) |
---|
124 | |
---|
125 | sys.stderr.write("FAILED test" + of + desc + sep + at + """ |
---|
126 | in directory: """ + os.getcwd() ) |
---|
127 | |
---|
128 | sys.exit(1) |
---|
129 | |
---|
130 | def no_result(self = None, condition = 1, function = None, skip = 0): |
---|
131 | """Causes a test to exit with no valid result. |
---|
132 | |
---|
133 | By default, the no_result() method reports NO RESULT for the test |
---|
134 | and exits with a status of 2. If a condition argument is supplied, |
---|
135 | the test fails only if the condition is true. |
---|
136 | """ |
---|
137 | if not condition: |
---|
138 | return |
---|
139 | if not function is None: |
---|
140 | function() |
---|
141 | of = "" |
---|
142 | desc = "" |
---|
143 | sep = " " |
---|
144 | if not self is None: |
---|
145 | if self.program: |
---|
146 | of = " of " + self.program |
---|
147 | sep = "\n\t" |
---|
148 | if self.description: |
---|
149 | desc = " [" + self.description + "]" |
---|
150 | sep = "\n\t" |
---|
151 | |
---|
152 | at = _caller(traceback.extract_stack(), skip) |
---|
153 | sys.stderr.write("NO RESULT for test" + of + desc + sep + at) |
---|
154 | |
---|
155 | sys.exit(2) |
---|
156 | |
---|
157 | def pass_test(self = None, condition = 1, function = None): |
---|
158 | """Causes a test to pass. |
---|
159 | |
---|
160 | By default, the pass_test() method reports PASSED for the test |
---|
161 | and exits with a status of 0. If a condition argument is supplied, |
---|
162 | the test passes only if the condition is true. |
---|
163 | """ |
---|
164 | if not condition: |
---|
165 | return |
---|
166 | if not function is None: |
---|
167 | function() |
---|
168 | sys.stderr.write("PASSED\n") |
---|
169 | sys.exit(0) |
---|
170 | |
---|
171 | def match_exact(lines = None, matches = None): |
---|
172 | """ |
---|
173 | """ |
---|
174 | if not type(lines) is ListType: |
---|
175 | lines = split(lines, "\n") |
---|
176 | if not type(matches) is ListType: |
---|
177 | matches = split(matches, "\n") |
---|
178 | if len(lines) != len(matches): |
---|
179 | return |
---|
180 | for i in range(len(lines)): |
---|
181 | if lines[i] != matches[i]: |
---|
182 | return |
---|
183 | return 1 |
---|
184 | |
---|
185 | def match_re(lines = None, res = None): |
---|
186 | """ |
---|
187 | """ |
---|
188 | if not type(lines) is ListType: |
---|
189 | lines = split(lines, "\n") |
---|
190 | if not type(res) is ListType: |
---|
191 | res = split(res, "\n") |
---|
192 | if len(lines) != len(res): |
---|
193 | return |
---|
194 | for i in range(len(lines)): |
---|
195 | if not re.compile("^" + res[i] + "$").search(lines[i]): |
---|
196 | return |
---|
197 | return 1 |
---|
198 | |
---|
199 | class TestCmd: |
---|
200 | """Class TestCmd |
---|
201 | """ |
---|
202 | |
---|
203 | def __init__(self, description = None, |
---|
204 | program = None, |
---|
205 | interpreter = None, |
---|
206 | workdir = None, |
---|
207 | subdir = None, |
---|
208 | verbose = 0, |
---|
209 | match = None, |
---|
210 | inpath = None): |
---|
211 | self._cwd = os.getcwd() |
---|
212 | self.description_set(description) |
---|
213 | if inpath: |
---|
214 | self.program = program |
---|
215 | else: |
---|
216 | self.program_set(program) |
---|
217 | self.interpreter_set(interpreter) |
---|
218 | self.verbose_set(verbose) |
---|
219 | if not match is None: |
---|
220 | self.match_func = match |
---|
221 | else: |
---|
222 | self.match_func = match_re |
---|
223 | self._dirlist = [] |
---|
224 | self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} |
---|
225 | if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '': |
---|
226 | self._preserve['pass_test'] = os.environ['PRESERVE'] |
---|
227 | self._preserve['fail_test'] = os.environ['PRESERVE'] |
---|
228 | self._preserve['no_result'] = os.environ['PRESERVE'] |
---|
229 | else: |
---|
230 | try: |
---|
231 | self._preserve['pass_test'] = os.environ['PRESERVE_PASS'] |
---|
232 | except KeyError: |
---|
233 | pass |
---|
234 | try: |
---|
235 | self._preserve['fail_test'] = os.environ['PRESERVE_FAIL'] |
---|
236 | except KeyError: |
---|
237 | pass |
---|
238 | try: |
---|
239 | self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT'] |
---|
240 | except KeyError: |
---|
241 | pass |
---|
242 | self._stdout = [] |
---|
243 | self._stderr = [] |
---|
244 | self.status = None |
---|
245 | self.condition = 'no_result' |
---|
246 | self.workdir_set(workdir) |
---|
247 | self.subdir(subdir) |
---|
248 | |
---|
249 | def __del__(self): |
---|
250 | self.cleanup() |
---|
251 | |
---|
252 | def __repr__(self): |
---|
253 | return "%x" % id(self) |
---|
254 | |
---|
255 | def cleanup(self, condition = None): |
---|
256 | """Removes any temporary working directories for the specified |
---|
257 | TestCmd environment. If the environment variable PRESERVE was |
---|
258 | set when the TestCmd environment was created, temporary working |
---|
259 | directories are not removed. If any of the environment variables |
---|
260 | PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set |
---|
261 | when the TestCmd environment was created, then temporary working |
---|
262 | directories are not removed if the test passed, failed, or had |
---|
263 | no result, respectively. Temporary working directories are also |
---|
264 | preserved for conditions specified via the preserve method. |
---|
265 | |
---|
266 | Typically, this method is not called directly, but is used when |
---|
267 | the script exits to clean up temporary working directories as |
---|
268 | appropriate for the exit status. |
---|
269 | """ |
---|
270 | if not self._dirlist: |
---|
271 | return |
---|
272 | if condition is None: |
---|
273 | condition = self.condition |
---|
274 | if self._preserve[condition]: |
---|
275 | for dir in self._dirlist: |
---|
276 | print "Preserved directory", dir |
---|
277 | else: |
---|
278 | list = self._dirlist[:] |
---|
279 | list.reverse() |
---|
280 | for dir in list: |
---|
281 | self.writable(dir, 1) |
---|
282 | shutil.rmtree(dir, ignore_errors = 1) |
---|
283 | |
---|
284 | self._dirlist = [] |
---|
285 | self.workdir = None |
---|
286 | os.chdir(self._cwd) |
---|
287 | try: |
---|
288 | global _Cleanup |
---|
289 | _Cleanup.remove(self) |
---|
290 | except (AttributeError, ValueError): |
---|
291 | pass |
---|
292 | |
---|
293 | def description_set(self, description): |
---|
294 | """Set the description of the functionality being tested. |
---|
295 | """ |
---|
296 | self.description = description |
---|
297 | |
---|
298 | # def diff(self): |
---|
299 | # """Diff two arrays. |
---|
300 | # """ |
---|
301 | |
---|
302 | def fail_test(self, condition = 1, function = None, skip = 0): |
---|
303 | """Cause the test to fail. |
---|
304 | """ |
---|
305 | if not condition: |
---|
306 | return |
---|
307 | self.condition = 'fail_test' |
---|
308 | fail_test(self = self, |
---|
309 | condition = condition, |
---|
310 | function = function, |
---|
311 | skip = skip) |
---|
312 | |
---|
313 | def interpreter_set(self, interpreter): |
---|
314 | """Set the program to be used to interpret the program |
---|
315 | under test as a script. |
---|
316 | """ |
---|
317 | self.interpreter = interpreter |
---|
318 | |
---|
319 | def match(self, lines, matches): |
---|
320 | """Compare actual and expected file contents. |
---|
321 | """ |
---|
322 | return self.match_func(lines, matches) |
---|
323 | |
---|
324 | def match_exact(self, lines, matches): |
---|
325 | """Compare actual and expected file contents. |
---|
326 | """ |
---|
327 | return match_exact(lines, matches) |
---|
328 | |
---|
329 | def match_re(self, lines, res): |
---|
330 | """Compare actual and expected file contents. |
---|
331 | """ |
---|
332 | return match_re(lines, res) |
---|
333 | |
---|
334 | def no_result(self, condition = 1, function = None, skip = 0): |
---|
335 | """Report that the test could not be run. |
---|
336 | """ |
---|
337 | if not condition: |
---|
338 | return |
---|
339 | self.condition = 'no_result' |
---|
340 | no_result(self = self, |
---|
341 | condition = condition, |
---|
342 | function = function, |
---|
343 | skip = skip) |
---|
344 | |
---|
345 | def pass_test(self, condition = 1, function = None): |
---|
346 | """Cause the test to pass. |
---|
347 | """ |
---|
348 | if not condition: |
---|
349 | return |
---|
350 | self.condition = 'pass_test' |
---|
351 | pass_test(self = self, condition = condition, function = function) |
---|
352 | |
---|
353 | def preserve(self, *conditions): |
---|
354 | """Arrange for the temporary working directories for the |
---|
355 | specified TestCmd environment to be preserved for one or more |
---|
356 | conditions. If no conditions are specified, arranges for |
---|
357 | the temporary working directories to be preserved for all |
---|
358 | conditions. |
---|
359 | """ |
---|
360 | if conditions is (): |
---|
361 | conditions = ('pass_test', 'fail_test', 'no_result') |
---|
362 | for cond in conditions: |
---|
363 | self._preserve[cond] = 1 |
---|
364 | |
---|
365 | def program_set(self, program): |
---|
366 | """Set the executable program or script to be tested. |
---|
367 | """ |
---|
368 | if program and program[0] and not os.path.isabs(program[0]): |
---|
369 | program[0] = os.path.join(self._cwd, program[0]) |
---|
370 | self.program = program |
---|
371 | |
---|
372 | def read(self, file, mode = 'rb'): |
---|
373 | """Reads and returns the contents of the specified file name. |
---|
374 | The file name may be a list, in which case the elements are |
---|
375 | concatenated with the os.path.join() method. The file is |
---|
376 | assumed to be under the temporary working directory unless it |
---|
377 | is an absolute path name. The I/O mode for the file may |
---|
378 | be specified; it must begin with an 'r'. The default is |
---|
379 | 'rb' (binary read). |
---|
380 | """ |
---|
381 | if type(file) is ListType: |
---|
382 | file = apply(os.path.join, tuple(file)) |
---|
383 | if not os.path.isabs(file): |
---|
384 | file = os.path.join(self.workdir, file) |
---|
385 | if mode[0] != 'r': |
---|
386 | raise ValueError, "mode must begin with 'r'" |
---|
387 | return open(file, mode).read() |
---|
388 | |
---|
389 | def run(self, program = None, |
---|
390 | interpreter = None, |
---|
391 | arguments = None, |
---|
392 | chdir = None, |
---|
393 | stdin = None): |
---|
394 | """Runs a test of the program or script for the test |
---|
395 | environment. Standard output and error output are saved for |
---|
396 | future retrieval via the stdout() and stderr() methods. |
---|
397 | """ |
---|
398 | if chdir: |
---|
399 | oldcwd = os.getcwd() |
---|
400 | if not os.path.isabs(chdir): |
---|
401 | chdir = os.path.join(self.workpath(chdir)) |
---|
402 | if self.verbose: |
---|
403 | sys.stderr.write("chdir(" + chdir + ")\n") |
---|
404 | os.chdir(chdir) |
---|
405 | cmd = [] |
---|
406 | if program and program[0]: |
---|
407 | if program[0] != self.program[0] and not os.path.isabs(program[0]): |
---|
408 | program[0] = os.path.join(self._cwd, program[0]) |
---|
409 | cmd += program |
---|
410 | # if interpreter: |
---|
411 | # cmd = interpreter + " " + cmd |
---|
412 | else: |
---|
413 | cmd += self.program |
---|
414 | # if self.interpreter: |
---|
415 | # cmd = self.interpreter + " " + cmd |
---|
416 | if arguments: |
---|
417 | cmd += arguments.split(" ") |
---|
418 | if self.verbose: |
---|
419 | sys.stderr.write(join(cmd, " ") + "\n") |
---|
420 | try: |
---|
421 | p = popen2.Popen3(cmd, 1) |
---|
422 | except AttributeError: |
---|
423 | (tochild, fromchild, childerr) = os.popen3(join(cmd, " ")) |
---|
424 | if stdin: |
---|
425 | if type(stdin) is ListType: |
---|
426 | for line in stdin: |
---|
427 | tochild.write(line) |
---|
428 | else: |
---|
429 | tochild.write(stdin) |
---|
430 | tochild.close() |
---|
431 | self._stdout.append(fromchild.read()) |
---|
432 | self._stderr.append(childerr.read()) |
---|
433 | fromchild.close() |
---|
434 | self.status = childerr.close() |
---|
435 | if not self.status: |
---|
436 | self.status = 0 |
---|
437 | except: |
---|
438 | raise |
---|
439 | else: |
---|
440 | if stdin: |
---|
441 | if type(stdin) is ListType: |
---|
442 | for line in stdin: |
---|
443 | p.tochild.write(line) |
---|
444 | else: |
---|
445 | p.tochild.write(stdin) |
---|
446 | p.tochild.close() |
---|
447 | self._stdout.append(p.fromchild.read()) |
---|
448 | self._stderr.append(p.childerr.read()) |
---|
449 | self.status = p.wait() |
---|
450 | |
---|
451 | if self.verbose: |
---|
452 | sys.stdout.write(self._stdout[-1]) |
---|
453 | sys.stderr.write(self._stderr[-1]) |
---|
454 | |
---|
455 | if chdir: |
---|
456 | os.chdir(oldcwd) |
---|
457 | |
---|
458 | def stderr(self, run = None): |
---|
459 | """Returns the error output from the specified run number. |
---|
460 | If there is no specified run number, then returns the error |
---|
461 | output of the last run. If the run number is less than zero, |
---|
462 | then returns the error output from that many runs back from the |
---|
463 | current run. |
---|
464 | """ |
---|
465 | if not run: |
---|
466 | run = len(self._stderr) |
---|
467 | elif run < 0: |
---|
468 | run = len(self._stderr) + run |
---|
469 | run = run - 1 |
---|
470 | if (run < 0): |
---|
471 | return '' |
---|
472 | return self._stderr[run] |
---|
473 | |
---|
474 | def stdout(self, run = None): |
---|
475 | """Returns the standard output from the specified run number. |
---|
476 | If there is no specified run number, then returns the standard |
---|
477 | output of the last run. If the run number is less than zero, |
---|
478 | then returns the standard output from that many runs back from |
---|
479 | the current run. |
---|
480 | """ |
---|
481 | if not run: |
---|
482 | run = len(self._stdout) |
---|
483 | elif run < 0: |
---|
484 | run = len(self._stdout) + run |
---|
485 | run = run - 1 |
---|
486 | if (run < 0): |
---|
487 | return '' |
---|
488 | return self._stdout[run] |
---|
489 | |
---|
490 | def subdir(self, *subdirs): |
---|
491 | """Create new subdirectories under the temporary working |
---|
492 | directory, one for each argument. An argument may be a list, |
---|
493 | in which case the list elements are concatenated using the |
---|
494 | os.path.join() method. Subdirectories multiple levels deep |
---|
495 | must be created using a separate argument for each level: |
---|
496 | |
---|
497 | test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) |
---|
498 | |
---|
499 | Returns the number of subdirectories actually created. |
---|
500 | """ |
---|
501 | count = 0 |
---|
502 | for sub in subdirs: |
---|
503 | if sub is None: |
---|
504 | continue |
---|
505 | if type(sub) is ListType: |
---|
506 | sub = apply(os.path.join, tuple(sub)) |
---|
507 | new = os.path.join(self.workdir, sub) |
---|
508 | try: |
---|
509 | os.mkdir(new) |
---|
510 | except: |
---|
511 | pass |
---|
512 | else: |
---|
513 | count = count + 1 |
---|
514 | return count |
---|
515 | |
---|
516 | def unlink (self, file): |
---|
517 | """Unlinks the specified file name. |
---|
518 | The file name may be a list, in which case the elements are |
---|
519 | concatenated with the os.path.join() method. The file is |
---|
520 | assumed to be under the temporary working directory unless it |
---|
521 | is an absolute path name. |
---|
522 | """ |
---|
523 | if type(file) is ListType: |
---|
524 | file = apply(os.path.join, tuple(file)) |
---|
525 | if not os.path.isabs(file): |
---|
526 | file = os.path.join(self.workdir, file) |
---|
527 | os.unlink(file) |
---|
528 | |
---|
529 | |
---|
530 | def verbose_set(self, verbose): |
---|
531 | """Set the verbose level. |
---|
532 | """ |
---|
533 | self.verbose = verbose |
---|
534 | |
---|
535 | def workdir_set(self, path): |
---|
536 | """Creates a temporary working directory with the specified |
---|
537 | path name. If the path is a null string (''), a unique |
---|
538 | directory name is created. |
---|
539 | """ |
---|
540 | |
---|
541 | if os.path.isabs(path): |
---|
542 | self.workdir = path |
---|
543 | else: |
---|
544 | if (path != None): |
---|
545 | if path == '': |
---|
546 | path = tempfile.mktemp() |
---|
547 | if path != None: |
---|
548 | os.mkdir(path) |
---|
549 | self._dirlist.append(path) |
---|
550 | global _Cleanup |
---|
551 | try: |
---|
552 | _Cleanup.index(self) |
---|
553 | except ValueError: |
---|
554 | _Cleanup.append(self) |
---|
555 | # We'd like to set self.workdir like this: |
---|
556 | # self.workdir = path |
---|
557 | # But symlinks in the path will report things |
---|
558 | # differently from os.getcwd(), so chdir there |
---|
559 | # and back to fetch the canonical path. |
---|
560 | cwd = os.getcwd() |
---|
561 | os.chdir(path) |
---|
562 | self.workdir = os.getcwd() |
---|
563 | os.chdir(cwd) |
---|
564 | else: |
---|
565 | self.workdir = None |
---|
566 | |
---|
567 | def workpath(self, *args): |
---|
568 | """Returns the absolute path name to a subdirectory or file |
---|
569 | within the current temporary working directory. Concatenates |
---|
570 | the temporary working directory name with the specified |
---|
571 | arguments using the os.path.join() method. |
---|
572 | """ |
---|
573 | return apply(os.path.join, (self.workdir,) + tuple(args)) |
---|
574 | |
---|
575 | def writable(self, top, write): |
---|
576 | """Make the specified directory tree writable (write == 1) |
---|
577 | or not (write == None). |
---|
578 | """ |
---|
579 | |
---|
580 | def _walk_chmod(arg, dirname, names): |
---|
581 | st = os.stat(dirname) |
---|
582 | os.chmod(dirname, arg(st[stat.ST_MODE])) |
---|
583 | for name in names: |
---|
584 | n = os.path.join(dirname, name) |
---|
585 | st = os.stat(n) |
---|
586 | os.chmod(n, arg(st[stat.ST_MODE])) |
---|
587 | |
---|
588 | def _mode_writable(mode): |
---|
589 | return stat.S_IMODE(mode|0200) |
---|
590 | |
---|
591 | def _mode_non_writable(mode): |
---|
592 | return stat.S_IMODE(mode&~0200) |
---|
593 | |
---|
594 | if write: |
---|
595 | f = _mode_writable |
---|
596 | else: |
---|
597 | f = _mode_non_writable |
---|
598 | try: |
---|
599 | os.path.walk(top, _walk_chmod, f) |
---|
600 | except: |
---|
601 | pass # ignore any problems changing modes |
---|
602 | |
---|
603 | def write(self, file, content, mode = 'wb'): |
---|
604 | """Writes the specified content text (second argument) to the |
---|
605 | specified file name (first argument). The file name may be |
---|
606 | a list, in which case the elements are concatenated with the |
---|
607 | os.path.join() method. The file is created under the temporary |
---|
608 | working directory. Any subdirectories in the path must already |
---|
609 | exist. The I/O mode for the file may be specified; it must |
---|
610 | begin with a 'w'. The default is 'wb' (binary write). |
---|
611 | """ |
---|
612 | if type(file) is ListType: |
---|
613 | file = apply(os.path.join, tuple(file)) |
---|
614 | if not os.path.isabs(file): |
---|
615 | file = os.path.join(self.workdir, file) |
---|
616 | if mode[0] != 'w': |
---|
617 | raise ValueError, "mode must begin with 'w'" |
---|
618 | open(file, mode).write(content) |
---|