monotone

monotone Mtn Source Tree

Root/netxx_pipe.cc

1// Copyright (C) 2005 Christof Petig <christof@petig-baender.de>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <netxx_pipe.hh>
12#include "sanity.hh"
13#include "platform.hh"
14#include <netxx/streamserver.h>
15#include <ostream> // for operator<<
16
17#ifdef WIN32
18#include <windows.h>
19#include <io.h>
20#include <fcntl.h>
21#else
22#include <unistd.h>
23#include <fcntl.h>
24#include <sys/wait.h>
25#include <errno.h>
26#endif
27
28using std::vector;
29using std::string;
30using std::make_pair;
31using std::exit;
32using std::perror;
33using std::strerror;
34
35Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
36 :
37#ifdef WIN32
38 child(INVALID_HANDLE_VALUE),
39 bytes_available(0),
40 read_in_progress(false)
41#else
42 readfd(_readfd),
43 writefd(_writefd),
44 child(0)
45#endif
46{
47#ifdef WIN32
48 if (_setmode(_readfd, _O_BINARY) == -1)
49 L(FL("failed to set input file descriptor to binary"));
50
51 if (_setmode(_writefd, _O_BINARY) == -1)
52 L(FL("failed to set output file descriptor to binary"));
53
54 named_pipe = (HANDLE)_get_osfhandle(_readfd);
55
56 E(named_pipe != INVALID_HANDLE_VALUE,
57 F("pipe handle is invalid"));
58
59 // Create infrastructure for overlapping I/O
60 memset(&overlap, 0, sizeof(overlap));
61 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
62 bytes_available = 0;
63 I(overlap.hEvent != 0);
64#else
65 int flags = fcntl(readfd, F_GETFL, 0);
66 I(fcntl(readfd, F_SETFL, flags | O_NONBLOCK) != -1);
67 flags = fcntl(writefd, F_GETFL, 0);
68 I(fcntl(writefd, F_SETFL, flags | O_NONBLOCK) != -1);
69#endif
70}
71
72
73#ifndef WIN32
74
75// Create pipes for stdio and fork subprocess, returns -1 on error, 0
76// to child and PID to parent.
77
78static pid_t
79pipe_and_fork(int fd1[2], int fd2[2])
80{
81 pid_t result = -1;
82 fd1[0] = -1;
83 fd1[1] = -1;
84 fd2[0] = -1;
85 fd2[1] = -1;
86
87 if (pipe(fd1))
88 return -1;
89
90 if (pipe(fd2))
91 {
92 close(fd1[0]);
93 close(fd1[1]);
94 return -1;
95 }
96
97 result = fork();
98
99 if (result < 0)
100 {
101 close(fd1[0]);
102 close(fd1[1]);
103 close(fd2[0]);
104 close(fd2[1]);
105 return -1;
106 }
107
108 else if (!result)
109 {
110 // fd1[1] for writing, fd2[0] for reading
111 close(fd1[0]);
112 close(fd2[1]);
113 if (dup2(fd2[0], 0) != 0 ||
114 dup2(fd1[1], 1) != 1)
115 {
116 perror("dup2");
117 exit(-1); // kill the useless child
118 }
119 close(fd1[1]);
120 close(fd2[0]);
121 }
122
123 else
124 {
125 // fd1[0] for reading, fd2[1] for writing
126 close(fd1[1]);
127 close(fd2[0]);
128 }
129
130 return result;
131}
132#endif
133
134#ifdef WIN32
135static string
136err_msg()
137{
138 char buf[1024];
139 I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
140 NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
141 (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0);
142 return string(buf);
143}
144#endif
145
146
147Netxx::PipeStream::PipeStream (const string & cmd,
148 const vector<string> & args)
149 :
150#ifdef WIN32
151 child(INVALID_HANDLE_VALUE),
152 bytes_available(0),
153 read_in_progress(false)
154#else
155 readfd(-1),
156 writefd(-1),
157 child(0)
158#endif
159{
160 // Unfortunately neither munge_argv_into_cmdline nor execvp do take
161 // a vector<string> as argument.
162
163 const unsigned newsize = 64;
164 const char *newargv[newsize];
165 I(args.size() < (sizeof(newargv) / sizeof(newargv[0])));
166
167 unsigned newargc = 0;
168 newargv[newargc++]=cmd.c_str();
169 for (vector<string>::const_iterator i = args.begin();
170 i != args.end(); ++i)
171 newargv[newargc++] = i->c_str();
172 newargv[newargc] = 0;
173
174#ifdef WIN32
175
176 // In order to use nonblocking i/o on windows, you must use named
177 // pipes and overlapped i/o. There is no other way, alas.
178
179 static unsigned long serial = 0;
180 string pipename = (F("\\\\.\\pipe\\netxx_pipe_%ld_%d")
181 % GetCurrentProcessId()
182 % (++serial)).str();
183
184 // Create the parent's handle to the named pipe.
185
186 named_pipe = CreateNamedPipe(pipename.c_str(),
187 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
188 PIPE_TYPE_BYTE | PIPE_WAIT,
189 1,
190 sizeof(readbuf),
191 sizeof(readbuf),
192 1000,
193 0);
194
195 E(named_pipe != INVALID_HANDLE_VALUE,
196 F("CreateNamedPipe(%s,...) call failed: %s")
197 % pipename % err_msg());
198
199 // Open the child's handle to the named pipe.
200
201 SECURITY_ATTRIBUTES inherit;
202 memset(&inherit,0,sizeof inherit);
203 inherit.nLength=sizeof inherit;
204 inherit.bInheritHandle = TRUE;
205
206 HANDLE hpipe = CreateFile(pipename.c_str(),
207 GENERIC_READ|GENERIC_WRITE, 0,
208 &inherit,
209 OPEN_EXISTING,
210 FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
211
212 E(hpipe != INVALID_HANDLE_VALUE,
213 F("CreateFile(%s,...) call failed: %s")
214 % pipename % err_msg());
215
216 // Set up the child with the pipes as stdin/stdout and inheriting stderr.
217
218 PROCESS_INFORMATION piProcInfo;
219 STARTUPINFO siStartInfo;
220
221 memset(&piProcInfo, 0, sizeof(piProcInfo));
222 memset(&siStartInfo, 0, sizeof(siStartInfo));
223
224 siStartInfo.cb = sizeof(siStartInfo);
225 siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
226 siStartInfo.hStdOutput = hpipe;
227 siStartInfo.hStdInput = hpipe;
228 siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
229
230 string cmdline = munge_argv_into_cmdline(newargv);
231 L(FL("Subprocess command line: '%s'") % cmdline);
232
233 BOOL started = CreateProcess(NULL, // Application name
234 const_cast<CHAR*>(cmdline.c_str()),
235 NULL, // Process attributes
236 NULL, // Thread attributes
237 TRUE, // Inherit handles
238 0, // Creation flags
239 NULL, // Environment
240 NULL, // Current directory
241 &siStartInfo,
242 &piProcInfo);
243 E(started,
244 F("CreateProcess(%s,...) call failed: %s")
245 % cmdline % err_msg());
246
247 child = piProcInfo.hProcess;
248
249 // create infrastructure for overlapping I/O
250
251 memset(&overlap, 0, sizeof(overlap));
252 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
253 bytes_available = 0;
254 I(overlap.hEvent != 0);
255
256#else // !WIN32
257
258 int fd1[2], fd2[2];
259 child = pipe_and_fork(fd1, fd2);
260 E(child >= 0, F("pipe/fork failed %s") % strerror(errno));
261 if (!child)
262 {
263 execvp(newargv[0], const_cast<char * const *>(newargv));
264 perror(newargv[0]);
265 exit(errno);
266 }
267 readfd = fd1[0];
268 writefd = fd2[1];
269 fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK);
270#endif
271
272 // P(F("mtn %d: set up i/o channels")
273 // % GetCurrentProcessId());
274}
275
276// Non blocking read.
277
278Netxx::signed_size_type
279Netxx::PipeStream::read (void *buffer, size_type length)
280{
281#ifdef WIN32
282
283 if (length > bytes_available)
284 length = bytes_available;
285
286 if (length)
287 {
288 memcpy(buffer, readbuf, length);
289 if (length < bytes_available)
290 memmove(readbuf, readbuf+length, bytes_available-length);
291 bytes_available -= length;
292 }
293
294 return length;
295#else
296 return ::read(readfd, buffer, length);
297#endif
298}
299
300Netxx::signed_size_type
301Netxx::PipeStream::write(const void *buffer, size_type length)
302{
303#ifdef WIN32
304 DWORD written = 0;
305 BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL);
306 E(ok, F("WriteFile call failed: %s") % err_msg());
307#else
308 size_t written = ::write(writefd, buffer, length);
309#endif
310 return written;
311}
312
313void
314Netxx::PipeStream::close (void)
315{
316
317#ifdef WIN32
318 if (named_pipe != INVALID_HANDLE_VALUE)
319 CloseHandle(named_pipe);
320 named_pipe = INVALID_HANDLE_VALUE;
321
322 if (overlap.hEvent != INVALID_HANDLE_VALUE)
323 CloseHandle(overlap.hEvent);
324 overlap.hEvent = INVALID_HANDLE_VALUE;
325
326 if (child != INVALID_HANDLE_VALUE)
327 WaitForSingleObject(child, INFINITE);
328 child = INVALID_HANDLE_VALUE;
329#else
330 if (readfd != -1)
331 ::close(readfd);
332 readfd = -1;
333
334 if (writefd != -1)
335 ::close(writefd);
336 writefd = -1;
337
338 if (child)
339 while (waitpid(child,0,0) == -1 && errno == EINTR);
340 child = 0;
341#endif
342}
343
344Netxx::socket_type
345Netxx::PipeStream::get_socketfd (void) const
346{
347#ifdef WIN32
348 return (Netxx::socket_type) named_pipe;
349#else
350 return Netxx::socket_type(-1);
351#endif
352}
353
354const Netxx::ProbeInfo*
355Netxx::PipeStream::get_probe_info (void) const
356{
357 return 0;
358}
359
360#ifdef WIN32
361
362static string
363status_name(DWORD wstatus)
364{
365 switch (wstatus) {
366 case WAIT_TIMEOUT: return "WAIT_TIMEOUT";
367 case WAIT_OBJECT_0: return "WAIT_OBJECT_0";
368 case WAIT_FAILED: return "WAIT_FAILED";
369 case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1";
370 default: return "UNKNOWN";
371 }
372}
373
374Netxx::Probe::result_type
375Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
376{
377 if (!is_pipe)
378 return Probe::ready(timeout, rt);
379
380 // L(F("mtn %d: checking for i/o ready state") % GetCurrentProcessId());
381
382 if (rt == ready_none)
383 rt = ready_t; // remembered from add
384
385 if (rt & ready_write)
386 {
387 return make_pair(pipe->get_socketfd(), ready_write);
388 }
389
390 if (rt & ready_read)
391 {
392 if (pipe->bytes_available == 0)
393 {
394 // Issue an async request to fill our buffer.
395 BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf,
396 sizeof(pipe->readbuf), NULL, &pipe->overlap);
397 E(ok || GetLastError() == ERROR_IO_PENDING,
398 F("ReadFile call failed: %s") % err_msg());
399 pipe->read_in_progress = true;
400 }
401
402 if (pipe->read_in_progress)
403 {
404 I(pipe->bytes_available == 0);
405
406 // Attempt to wait for the completion of the read-in-progress.
407
408 int milliseconds = ((timeout.get_sec() * 1000)
409 + (timeout.get_usec() / 1000));
410
411 L(FL("WaitForSingleObject(,%d)") % milliseconds);
412
413 DWORD wstatus = WAIT_FAILED;
414
415 if (pipe->child != INVALID_HANDLE_VALUE)
416 {
417
418 // We're a server; we're going to wait for the client to
419 // exit as well as the pipe read status, because
420 // apparently you don't find out about closed pipes
421 // during an overlapped read request (?)
422
423 HANDLE handles[2];
424 handles[0] = pipe->overlap.hEvent;
425 handles[1] = pipe->child;
426
427 wstatus = WaitForMultipleObjects(2,
428 handles,
429 FALSE,
430 milliseconds);
431
432 E(wstatus != WAIT_FAILED,
433 F("WaitForMultipleObjects call failed: %s") % err_msg());
434
435 if (wstatus == WAIT_OBJECT_0 + 1)
436 return make_pair(pipe->get_socketfd(), ready_oobd);
437 }
438 else
439 {
440 wstatus = WaitForSingleObject(pipe->overlap.hEvent,
441 milliseconds);
442 E(wstatus != WAIT_FAILED,
443 F("WaitForSingleObject call failed: %s") % err_msg());
444 }
445
446 if (wstatus == WAIT_TIMEOUT)
447 return make_pair(-1, ready_none);
448
449 BOOL ok = GetOverlappedResult(pipe->named_pipe,
450 &pipe->overlap,
451 &pipe->bytes_available,
452 FALSE);
453
454 if (ok)
455 {
456 // We completed our read.
457 pipe->read_in_progress = false;
458 }
459 else
460 {
461 // We did not complete our read.
462 E(GetLastError() == ERROR_IO_INCOMPLETE,
463 F("GetOverlappedResult call failed: %s")
464 % err_msg());
465 }
466 }
467
468 if (pipe->bytes_available != 0)
469 {
470 return make_pair(pipe->get_socketfd(), ready_read);
471 }
472 }
473
474 return make_pair(pipe->get_socketfd(), ready_none);
475}
476
477void
478Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
479{
480 assert(!is_pipe);
481 assert(!pipe);
482 is_pipe = true;
483 pipe = &ps;
484 ready_t = rt;
485}
486
487void
488Netxx::PipeCompatibleProbe::add(StreamBase const &sb, ready_type rt)
489{
490 // FIXME: This is *still* an unfortunate way of performing a
491 // downcast, though slightly less awful than the old way, which
492 // involved throwing an exception.
493 //
494 // Perhaps we should twiddle the caller-visible API.
495
496 StreamBase const *sbp = &sb;
497 PipeStream const *psp = dynamic_cast<PipeStream const *>(sbp);
498 if (psp)
499 add(const_cast<PipeStream&>(*psp),rt);
500 else
501 {
502 assert(!is_pipe);
503 Probe::add(sb,rt);
504 }
505}
506
507void
508Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
509{
510 assert(!is_pipe);
511 Probe::add(ss,rt);
512}
513#else // unix
514void
515Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
516 {
517 if (rt == ready_none || rt & ready_read)
518 add_socket(ps.get_readfd(), ready_read);
519 if (rt == ready_none || rt & ready_write)
520 add_socket(ps.get_writefd(), ready_write);
521 }
522
523void
524Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
525{
526 try
527 {
528 add(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)),rt);
529 }
530 catch (...)
531 {
532 Probe::add(sb,rt);
533 }
534}
535
536void
537Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
538{
539 Probe::add(ss,rt);
540}
541#endif
542
543#ifdef BUILD_UNIT_TESTS
544#include "unit_tests.hh"
545
546UNIT_TEST(pipe, simple_pipe)
547{ try
548 {
549 Netxx::PipeStream pipe("cat",vector<string>());
550
551 string result;
552 Netxx::PipeCompatibleProbe probe;
553 Netxx::Timeout timeout(2L), short_time(0,1000);
554
555 // time out because no data is available
556 probe.clear();
557 probe.add(pipe, Netxx::Probe::ready_read);
558 Netxx::Probe::result_type res = probe.ready(short_time);
559 I(res.second==Netxx::Probe::ready_none);
560
561 // write should be possible
562 probe.clear();
563 probe.add(pipe, Netxx::Probe::ready_write);
564 res = probe.ready(short_time);
565 I(res.second & Netxx::Probe::ready_write);
566#ifdef WIN32
567 I(res.first==pipe.get_socketfd());
568#else
569 I(res.first==pipe.get_writefd());
570#endif
571
572 // try binary transparency
573 for (int c = 0; c < 256; ++c)
574 {
575 char buf[1024];
576 buf[0] = c;
577 buf[1] = 255 - c;
578 pipe.write(buf, 2);
579
580 string result;
581 while (result.size() < 2)
582 { // wait for data to arrive
583 probe.clear();
584 probe.add(pipe, Netxx::Probe::ready_read);
585 res = probe.ready(timeout);
586 E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c);
587#ifdef WIN32
588 I(res.first == pipe.get_socketfd());
589#else
590 I(res.first == pipe.get_readfd());
591#endif
592 int bytes = pipe.read(buf, sizeof(buf));
593 result += string(buf, bytes);
594 }
595 I(result.size() == 2);
596 I(static_cast<unsigned char>(result[0]) == c);
597 I(static_cast<unsigned char>(result[1]) == 255 - c);
598 }
599
600 pipe.close();
601
602 }
603catch (informative_failure &e)
604 // for some reason boost does not provide
605 // enough information
606 {
607 W(F("Failure %s") % e.what());
608 throw;
609 }
610}
611#endif
612
613// Local Variables:
614// mode: C++
615// fill-column: 76
616// c-file-style: "gnu"
617// indent-tabs-mode: nil
618// End:
619// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status