libdap  Updated for version 3.17.2
MarshallerThread.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
4 // Access Protocol.
5 
6 // Copyright (c) 2015 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 /*
26  * MarshallerThread.cc
27  *
28  * Created on: Aug 27, 2015
29  * Author: jimg
30  */
31 
32 #include "config.h"
33 
34 #include <pthread.h>
35 #include <sys/time.h>
36 #include <fcntl.h>
37 #include <unistd.h>
38 
39 #include <ostream>
40 #include <sstream>
41 
42 #include "MarshallerThread.h"
43 #include "Error.h"
44 #include "InternalErr.h"
45 #include "debug.h"
46 
47 using namespace libdap;
48 using namespace std;
49 
50 #if 0
51 bool MarshallerThread::print_time = false;
52 
58 static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
59 {
60  /* Perform the carry for the later subtraction by updating y. */
61  if (stop->tv_usec < start->tv_usec) {
62  int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
63  start->tv_usec -= 1000000 * nsec;
64  start->tv_sec += nsec;
65  }
66  if (stop->tv_usec - start->tv_usec > 1000000) {
67  int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
68  start->tv_usec += 1000000 * nsec;
69  start->tv_sec -= nsec;
70  }
71 
72  double result = stop->tv_sec - start->tv_sec;
73  result += double(stop->tv_usec - start->tv_usec) / 1000000;
74  return result;
75 }
76 #endif
77 
78 
88 Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
89  m_mutex(lock)
90 {
91  int status = pthread_mutex_lock(&m_mutex);
92 
93  DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl);
94 
95  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
96  while (count != 0) {
97  status = pthread_cond_wait(&cond, &m_mutex);
98  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
99  }
100  if (count != 0) throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
101 
102  DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
103 }
104 
109 {
110  DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
111 
112  int status = pthread_mutex_unlock(&m_mutex);
113  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
114 }
115 
116 
130 ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
131  m_mutex(lock), m_cond(cond), m_count(count)
132 {
133  int status = pthread_mutex_lock(&m_mutex);
134 
135  DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
136 
137  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
138 
139  DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
140 }
141 
142 ChildLocker::~ChildLocker()
143 {
144  DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
145 
146  m_count = 0;
147  int status = pthread_cond_signal(&m_cond);
148  if (status != 0)
149  throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");
150 
151  status = pthread_mutex_unlock(&m_mutex);
152  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
153 }
154 
155 MarshallerThread::MarshallerThread() :
156  d_thread(0), d_child_thread_count(0)
157 {
158  if (pthread_attr_init(&d_thread_attr) != 0) throw Error(internal_error, "Failed to initialize pthread attributes.");
159  if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
160  throw Error(internal_error, "Failed to complete pthread attribute initialization.");
161 
162  if (pthread_mutex_init(&d_out_mutex, 0) != 0) throw Error(internal_error, "Failed to initialize mutex.");
163  if (pthread_cond_init(&d_out_cond, 0) != 0) throw Error(internal_error, "Failed to initialize cond.");
164 }
165 
166 MarshallerThread::~MarshallerThread()
167 {
168  int status = pthread_mutex_lock(&d_out_mutex);
169  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
170  while (d_child_thread_count != 0) {
171  status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
172  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
173  }
174  if (d_child_thread_count != 0)
175  throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
176 
177  status = pthread_mutex_unlock(&d_out_mutex);
178  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
179 
180  pthread_mutex_destroy(&d_out_mutex);
181  pthread_cond_destroy(&d_out_cond);
182 
183  pthread_attr_destroy(&d_thread_attr);
184 }
185 
186 // not a static method
192 void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf,
193  unsigned int bytes)
194 {
195  write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf,
196  bytes);
197  int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
198  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
199 }
200 
204 void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes)
205 {
206  write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf,
207  bytes);
208  int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
209  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
210 }
211 
221 void *
223 {
224  write_args *args = reinterpret_cast<write_args *>(arg);
225 
226  ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
227 
228 #if 0
229  struct timeval tp_s;
230  if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
231 #endif
232 
233  // force an error
234  // return (void*)-1;
235 
236  if (args->d_out_file != -1) {
237  int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
238  if (bytes_written != args->d_num)
239  return (void*) -1;
240  }
241  else {
242  args->d_out.write(args->d_buf, args->d_num);
243  if (args->d_out.fail()) {
244  ostringstream oss;
245  oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
246  args->d_error = oss.str();
247  return (void*) -1;
248  }
249  }
250 
251  delete [] args->d_buf;
252  delete args;
253 
254 #if 0
255  struct timeval tp_e;
256  if (print_time) {
257  if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;
258 
259  cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
260  }
261 #endif
262 
263  return 0;
264 }
265 
278 void *
280 {
281  write_args *args = reinterpret_cast<write_args *>(arg);
282 
283  ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
284 
285  if (args->d_out_file != -1) {
286  int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
287  if (bytes_written != args->d_num) return (void*) -1;
288  }
289  else {
290  args->d_out.write(args->d_buf + 4, args->d_num);
291  if (args->d_out.fail()) {
292  ostringstream oss;
293  oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
294  args->d_error = oss.str();
295  return (void*) -1;
296  }
297  }
298 
299  delete [] args->d_buf;
300  delete args;
301 
302  return 0;
303 }
304 
static void * write_thread(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
STL namespace.
A class for software fault reporting.
Definition: InternalErr.h:64
static void * write_thread_part(void *arg)
A class for error processing.
Definition: Error.h:90