1
2
3
4 from backend import errors
5 from backend.dispatcher import Worker
6 from backend.actions import Action
7 from bunch import Bunch
8 from retask.task import Task
9 from retask.queue import Queue
10 import ConfigParser
11 import daemon
12 import glob
13 import grp
14 import json
15 import lockfile
16 import logging
17 import multiprocessing
18 import optparse
19 import os
20 import pwd
21 import requests
22 import setproctitle
23 import signal
24 import sys
25 import time
26
27
29 """
30 To make returning items from config parser less irritating
31 """
32
33 if cp.has_section(section) and cp.has_option(section, option):
34 return cp.get(section, option)
35 return default
36
37
39
40 """
41 Fetch jobs from the Frontend
42 - submit them to the jobs queue for workers
43 """
44
45 - def __init__(self, opts, events, jobs, lock):
46
47 multiprocessing.Process.__init__(self, name="jobgrab")
48
49 self.opts = opts
50 self.events = events
51 self.jobs = jobs
52 self.added_jobs = []
53 self.lock = lock
54
56 self.events.put({"when": time.time(), "who": "jobgrab", "what": what})
57
59 try:
60 r = requests.get(
61 "{0}/waiting/".format(self.opts.frontend_url),
62 auth=("user", self.opts.frontend_auth))
63
64 except requests.RequestException as e:
65 self.event("Error retrieving jobs from {0}: {1}".format(
66 self.opts.frontend_url, e))
67 else:
68 try:
69 r_json = json.loads(r.content)
70 except ValueError as e:
71 self.event("Error getting JSON build list from FE {0}"
72 .format(e))
73 return
74
75 if "builds" in r_json and r_json["builds"]:
76 self.event("{0} jobs returned".format(len(r_json["builds"])))
77 count = 0
78 for b in r_json["builds"]:
79 if "id" in b:
80 extended_id = "{0}-{1}".format(b["id"], b["chroot"])
81 jobfile = os.path.join(
82 self.opts.jobsdir,
83 "{0}.json".format(extended_id))
84
85 if (not os.path.exists(jobfile) and
86 extended_id not in self.added_jobs):
87
88 count += 1
89 open(jobfile, 'w').write(json.dumps(b))
90 self.event("Wrote job: {0}".format(extended_id))
91 if count:
92 self.event("New jobs: %s" % count)
93 if "actions" in r_json and r_json["actions"]:
94 self.event("{0} actions returned".format(
95 len(r_json["actions"])))
96
97 for action in r_json["actions"]:
98 ao = Action(self.opts, self.events, action, self.lock)
99 ao.run()
100
102 setproctitle.setproctitle("CoprJobGrab")
103 abort = False
104 try:
105 while not abort:
106 self.fetch_jobs()
107 for f in sorted(glob.glob(
108 os.path.join(self.opts.jobsdir, "*.json"))):
109
110 n = os.path.basename(f).replace(".json", "")
111 if n not in self.added_jobs:
112 self.jobs.put(f)
113 self.added_jobs.append(n)
114 self.event("adding to work queue id {0}".format(n))
115 time.sleep(self.opts.sleeptime)
116 except KeyboardInterrupt:
117 return
118
119
120 -class CoprLog(multiprocessing.Process):
121
122 """log mechanism where items from the events queue get recorded"""
123
125
126
127 multiprocessing.Process.__init__(self, name="logger")
128
129 self.opts = opts
130 self.events = events
131
132 logdir = os.path.dirname(self.opts.logfile)
133 if not os.path.exists(logdir):
134 os.makedirs(logdir, mode=0750)
135
136
137 logging.basicConfig(filename=self.opts.logfile, level=logging.DEBUG)
138
139 - def log(self, event):
140
141 when = time.strftime("%F %T", time.gmtime(event["when"]))
142 msg = "{0} : {1}: {2}".format(when,
143 event["who"],
144 event["what"].strip())
145
146 try:
147 if self.opts.verbose:
148 sys.stderr.write("{0}\n".format(msg))
149 sys.stderr.flush()
150 logging.debug(msg)
151 except (IOError, OSError) as e:
152 sys.stderr.write("Could not write to logfile {0} - {1}\n".format(
153 self.logfile, e))
154
155
156
158 setproctitle.setproctitle("CoprLog")
159 abort = False
160 try:
161 while not abort:
162 e = self.events.get()
163 if "when" in e and "who" in e and "what" in e:
164 self.log(e)
165 except KeyboardInterrupt:
166 return
167
168
170
171 """
172 Core process - starts/stops/initializes workers
173 """
174
175 - def __init__(self, config_file=None, ext_opts=None):
176
177
178
179 if not config_file:
180 raise errors.CoprBackendError("Must specify config_file")
181
182 self.config_file = config_file
183 self.ext_opts = ext_opts
184 self.opts = self.read_conf()
185 self.lock = multiprocessing.Lock()
186
187
188 self.jobs = multiprocessing.Queue()
189 self.events = multiprocessing.Queue()
190
191
192
193
194 self._logger = CoprLog(self.opts, self.events)
195 self._logger.start()
196
197 self.event("Starting up Job Grabber")
198
199 self._jobgrab = CoprJobGrab(self.opts, self.events, self.jobs, self.lock)
200 self._jobgrab.start()
201 self.worker_num = 0
202 self.abort = False
203
204 if not os.path.exists(self.opts.worker_logdir):
205 os.makedirs(self.opts.worker_logdir, mode=0750)
206
207 self.workers = []
208
210 self.events.put({"when": time.time(), "who": "main", "what": what})
211
213 "read in config file - return Bunch of config data"
214 opts = Bunch()
215 cp = ConfigParser.ConfigParser()
216 try:
217 cp.read(self.config_file)
218 opts.results_baseurl = _get_conf(
219 cp, "backend", "results_baseurl", "http://copr")
220 opts.frontend_url = _get_conf(
221 cp, "backend", "frontend_url", "http://coprs/rest/api")
222 opts.frontend_auth = _get_conf(
223 cp, "backend", "frontend_auth", "PASSWORDHERE")
224
225 opts.architectures = _get_conf(
226 cp, "backend", "architectures", "i386,x86_64").split(",")
227
228 opts.spawn_playbook = {}
229 for arch in opts.architectures:
230 opts.spawn_playbook[arch] = _get_conf(
231 cp, "backend", "spawn_playbook-{0}".format(arch),
232 "/srv/copr-work/provision/builderpb-{0}.yml".format(arch))
233
234 opts.terminate_playbook = _get_conf(
235 cp, "backend", "terminate_playbook",
236 "/srv/copr-work/provision/terminatepb.yml")
237
238 opts.jobsdir = _get_conf(cp, "backend", "jobsdir", None)
239 opts.destdir = _get_conf(cp, "backend", "destdir", None)
240 opts.exit_on_worker = _get_conf(
241 cp, "backend", "exit_on_worker", False)
242 opts.fedmsg_enabled = _get_conf(
243 cp, "backend", "fedmsg_enabled", False)
244 opts.sleeptime = int(_get_conf(cp, "backend", "sleeptime", 10))
245 opts.num_workers = int(_get_conf(cp, "backend", "num_workers", 8))
246 opts.timeout = int(_get_conf(cp, "builder", "timeout", 1800))
247 opts.logfile = _get_conf(
248 cp, "backend", "logfile", "/var/log/copr/backend.log")
249 opts.verbose = _get_conf(cp, "backend", "verbose", False)
250 opts.worker_logdir = _get_conf(
251 cp, "backend", "worker_logdir", "/var/log/copr/workers/")
252 opts.spawn_vars = _get_conf(cp, "backend", "spawn_vars", None)
253 opts.terminate_vars = _get_conf(cp, "backend", "terminate_vars",
254 None)
255
256
257
258
259
260 except ConfigParser.Error as e:
261 raise errors.CoprBackendError(
262 "Error parsing config file: {0}: {1}".format(
263 self.config_file, e))
264
265 if not opts.jobsdir or not opts.destdir:
266 raise errors.CoprBackendError(
267 "Incomplete Config - must specify"
268 " jobsdir and destdir in configuration")
269
270 if self.ext_opts:
271 for v in self.ext_opts:
272 setattr(opts, v, self.ext_opts.get(v))
273 return opts
274
276 self.abort = False
277 while not self.abort:
278
279 self.opts = self.read_conf()
280
281 if self.jobs.qsize():
282 self.event("# jobs in queue: {0}".format(self.jobs.qsize()))
283
284 if len(self.workers) < self.opts.num_workers:
285 self.event("Spinning up more workers for jobs")
286 for _ in range(self.opts.num_workers - len(self.workers)):
287 self.worker_num += 1
288 w = Worker(
289 self.opts, self.jobs, self.events, self.worker_num,
290 lock=self.lock)
291 self.workers.append(w)
292 w.start()
293 self.event("Finished starting worker processes")
294
295
296
297
298
299
300
301
302 for w in self.workers:
303 if not w.is_alive():
304 self.event("Worker {0} died unexpectedly".format(
305 w.worker_num))
306 if self.opts.exit_on_worker:
307 raise errors.CoprBackendError(
308 "Worker died unexpectedly, exiting")
309 else:
310 self.workers.remove(w)
311 w.terminate()
312
313 time.sleep(self.opts.sleeptime)
314
316 """
317 Cleanup backend processes (just workers for now)
318 """
319
320 self.abort = True
321 for w in self.workers:
322 self.workers.remove(w)
323 w.terminate()
324
325
327 parser = optparse.OptionParser("\ncopr-be [options]")
328 parser.add_option("-c", "--config", default="/etc/copr/copr-be.conf",
329 dest="config_file",
330 help="config file to use for copr-be run")
331 parser.add_option("-d", "--daemonize", default=False, dest="daemonize",
332 action="store_true", help="daemonize or not")
333 parser.add_option("-p", "--pidfile",
334 default="/var/run/copr-backend/copr-be.pid",
335 dest="pidfile",
336 help="pid file to use for copr-be if daemonized")
337 parser.add_option("-x", "--exit", default=False, dest="exit_on_worker",
338 action="store_true", help="exit on worker failure")
339 parser.add_option("-v", "--verbose", default=False, dest="verbose",
340 action="store_true", help="be more verbose")
341
342 opts, args = parser.parse_args(args)
343 if not os.path.exists(opts.config_file):
344 sys.stderr.write("No config file found at: {0}\n".format(
345 opts.config_file))
346 sys.exit(1)
347 opts.config_file = os.path.abspath(opts.config_file)
348
349 ret_opts = Bunch()
350 for o in ("daemonize", "exit_on_worker", "pidfile", "config_file"):
351 setattr(ret_opts, o, getattr(opts, o))
352
353 return ret_opts
354
355
357 opts = parse_args(args)
358
359 try:
360 context = daemon.DaemonContext(
361 pidfile=lockfile.FileLock(opts.pidfile),
362 gid=grp.getgrnam("copr").gr_gid,
363 uid=pwd.getpwnam("copr").pw_uid,
364 detach_process=opts.daemonize,
365 umask=022,
366 stderr=sys.stderr,
367 signal_map={
368 signal.SIGTERM: "terminate",
369 signal.SIGHUP: "terminate",
370 },
371 )
372 with context:
373 cbe = CoprBackend(opts.config_file, ext_opts=opts)
374 cbe.run()
375 except (Exception, KeyboardInterrupt):
376 sys.stderr.write("Killing/Dying\n")
377 if "cbe" in locals():
378 cbe.terminate()
379 raise
380
381 if __name__ == "__main__":
382 try:
383 main(sys.argv[1:])
384 except KeyboardInterrupt:
385 sys.stderr.write("\nUser cancelled, may need cleanup\n")
386 sys.exit(0)
387