monotone

monotone Mtn Source Tree

Root/netxx_pipe.cc

1// Copyright (C) 2005 Christof Petig <christof@petig-baender.de>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <netxx_pipe.hh>
12#include "sanity.hh"
13#include "platform.hh"
14#include <netxx/streamserver.h>
15#include <ostream> // for operator<<
16
17#ifdef WIN32
18#include <windows.h>
19#include <io.h>
20#include <fcntl.h>
21#else
22#include <unistd.h>
23#include <fcntl.h>
24#include <sys/wait.h>
25#include <errno.h>
26#include <cstring> // strerror
27#endif
28
29using std::vector;
30using std::string;
31using std::make_pair;
32using std::exit;
33using std::perror;
34using std::strerror;
35
36Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
37 :
38#ifdef WIN32
39 child(INVALID_HANDLE_VALUE),
40 bytes_available(0),
41 read_in_progress(false)
42#else
43 readfd(_readfd),
44 writefd(_writefd),
45 child(0)
46#endif
47{
48#ifdef WIN32
49 if (_setmode(_readfd, _O_BINARY) == -1)
50 L(FL("failed to set input file descriptor to binary"));
51
52 if (_setmode(_writefd, _O_BINARY) == -1)
53 L(FL("failed to set output file descriptor to binary"));
54
55 named_pipe = (HANDLE)_get_osfhandle(_readfd);
56
57 E(named_pipe != INVALID_HANDLE_VALUE,
58 F("pipe handle is invalid"));
59
60 // Create infrastructure for overlapping I/O
61 memset(&overlap, 0, sizeof(overlap));
62 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
63 bytes_available = 0;
64 I(overlap.hEvent != 0);
65#else
66 int flags = fcntl(readfd, F_GETFL, 0);
67 I(fcntl(readfd, F_SETFL, flags | O_NONBLOCK) != -1);
68 flags = fcntl(writefd, F_GETFL, 0);
69 I(fcntl(writefd, F_SETFL, flags | O_NONBLOCK) != -1);
70#endif
71}
72
73
74#ifndef WIN32
75
76// Create pipes for stdio and fork subprocess, returns -1 on error, 0
77// to child and PID to parent.
78
79static pid_t
80pipe_and_fork(int fd1[2], int fd2[2])
81{
82 pid_t result = -1;
83 fd1[0] = -1;
84 fd1[1] = -1;
85 fd2[0] = -1;
86 fd2[1] = -1;
87
88 if (pipe(fd1))
89 return -1;
90
91 if (pipe(fd2))
92 {
93 close(fd1[0]);
94 close(fd1[1]);
95 return -1;
96 }
97
98 result = fork();
99
100 if (result < 0)
101 {
102 close(fd1[0]);
103 close(fd1[1]);
104 close(fd2[0]);
105 close(fd2[1]);
106 return -1;
107 }
108
109 else if (!result)
110 {
111 // fd1[1] for writing, fd2[0] for reading
112 close(fd1[0]);
113 close(fd2[1]);
114 if (dup2(fd2[0], 0) != 0 ||
115 dup2(fd1[1], 1) != 1)
116 {
117 perror("dup2");
118 exit(-1); // kill the useless child
119 }
120 close(fd1[1]);
121 close(fd2[0]);
122 }
123
124 else
125 {
126 // fd1[0] for reading, fd2[1] for writing
127 close(fd1[1]);
128 close(fd2[0]);
129 }
130
131 return result;
132}
133#endif
134
135#ifdef WIN32
136static string
137err_msg()
138{
139 char buf[1024];
140 I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
141 NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
142 (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0);
143 return string(buf);
144}
145#endif
146
147
148Netxx::PipeStream::PipeStream (const string & cmd,
149 const vector<string> & args)
150 :
151#ifdef WIN32
152 child(INVALID_HANDLE_VALUE),
153 bytes_available(0),
154 read_in_progress(false)
155#else
156 readfd(-1),
157 writefd(-1),
158 child(0)
159#endif
160{
161 // Unfortunately neither munge_argv_into_cmdline nor execvp do take
162 // a vector<string> as argument.
163
164 const unsigned newsize = 64;
165 const char *newargv[newsize];
166 I(args.size() < (sizeof(newargv) / sizeof(newargv[0])));
167
168 unsigned newargc = 0;
169 newargv[newargc++]=cmd.c_str();
170 for (vector<string>::const_iterator i = args.begin();
171 i != args.end(); ++i)
172 newargv[newargc++] = i->c_str();
173 newargv[newargc] = 0;
174
175#ifdef WIN32
176
177 // In order to use nonblocking i/o on windows, you must use named
178 // pipes and overlapped i/o. There is no other way, alas.
179
180 static unsigned long serial = 0;
181 string pipename = (FL("\\\\.\\pipe\\netxx_pipe_%ld_%d")
182 % GetCurrentProcessId()
183 % (++serial)).str();
184
185 // Create the parent's handle to the named pipe.
186
187 named_pipe = CreateNamedPipe(pipename.c_str(),
188 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
189 PIPE_TYPE_BYTE | PIPE_WAIT,
190 1,
191 sizeof(readbuf),
192 sizeof(readbuf),
193 1000,
194 0);
195
196 E(named_pipe != INVALID_HANDLE_VALUE,
197 F("CreateNamedPipe(%s,...) call failed: %s")
198 % pipename % err_msg());
199
200 // Open the child's handle to the named pipe.
201
202 SECURITY_ATTRIBUTES inherit;
203 memset(&inherit,0,sizeof inherit);
204 inherit.nLength=sizeof inherit;
205 inherit.bInheritHandle = TRUE;
206
207 HANDLE hpipe = CreateFile(pipename.c_str(),
208 GENERIC_READ|GENERIC_WRITE, 0,
209 &inherit,
210 OPEN_EXISTING,
211 FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
212
213 E(hpipe != INVALID_HANDLE_VALUE,
214 F("CreateFile(%s,...) call failed: %s")
215 % pipename % err_msg());
216
217 // Set up the child with the pipes as stdin/stdout and inheriting stderr.
218
219 PROCESS_INFORMATION piProcInfo;
220 STARTUPINFO siStartInfo;
221
222 memset(&piProcInfo, 0, sizeof(piProcInfo));
223 memset(&siStartInfo, 0, sizeof(siStartInfo));
224
225 siStartInfo.cb = sizeof(siStartInfo);
226 siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
227 siStartInfo.hStdOutput = hpipe;
228 siStartInfo.hStdInput = hpipe;
229 siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
230
231 string cmdline = munge_argv_into_cmdline(newargv);
232 L(FL("Subprocess command line: '%s'") % cmdline);
233
234 BOOL started = CreateProcess(NULL, // Application name
235 const_cast<CHAR*>(cmdline.c_str()),
236 NULL, // Process attributes
237 NULL, // Thread attributes
238 TRUE, // Inherit handles
239 0, // Creation flags
240 NULL, // Environment
241 NULL, // Current directory
242 &siStartInfo,
243 &piProcInfo);
244 E(started,
245 F("CreateProcess(%s,...) call failed: %s")
246 % cmdline % err_msg());
247
248 child = piProcInfo.hProcess;
249
250 // create infrastructure for overlapping I/O
251
252 memset(&overlap, 0, sizeof(overlap));
253 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
254 bytes_available = 0;
255 I(overlap.hEvent != 0);
256
257#else // !WIN32
258
259 int fd1[2], fd2[2];
260 child = pipe_and_fork(fd1, fd2);
261 E(child >= 0, F("pipe/fork failed: %s") % strerror(errno));
262 if (!child)
263 {
264 execvp(newargv[0], const_cast<char * const *>(newargv));
265 perror(newargv[0]);
266 exit(errno);
267 }
268 readfd = fd1[0];
269 writefd = fd2[1];
270 fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK);
271#endif
272
273 // P(F("mtn %d: set up i/o channels")
274 // % GetCurrentProcessId());
275}
276
277// Non blocking read.
278
279Netxx::signed_size_type
280Netxx::PipeStream::read (void *buffer, size_type length)
281{
282#ifdef WIN32
283
284 if (length > bytes_available)
285 length = bytes_available;
286
287 if (length)
288 {
289 memcpy(buffer, readbuf, length);
290 if (length < bytes_available)
291 memmove(readbuf, readbuf+length, bytes_available-length);
292 bytes_available -= length;
293 }
294
295 return length;
296#else
297 return ::read(readfd, buffer, length);
298#endif
299}
300
301Netxx::signed_size_type
302Netxx::PipeStream::write(const void *buffer, size_type length)
303{
304#ifdef WIN32
305 DWORD written = 0;
306 BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL);
307 E(ok, F("WriteFile call failed: %s") % err_msg());
308#else
309 size_t written = ::write(writefd, buffer, length);
310#endif
311 return written;
312}
313
314void
315Netxx::PipeStream::close (void)
316{
317
318#ifdef WIN32
319 if (named_pipe != INVALID_HANDLE_VALUE)
320 CloseHandle(named_pipe);
321 named_pipe = INVALID_HANDLE_VALUE;
322
323 if (overlap.hEvent != INVALID_HANDLE_VALUE)
324 CloseHandle(overlap.hEvent);
325 overlap.hEvent = INVALID_HANDLE_VALUE;
326
327 if (child != INVALID_HANDLE_VALUE)
328 WaitForSingleObject(child, INFINITE);
329 child = INVALID_HANDLE_VALUE;
330#else
331 if (readfd != -1)
332 ::close(readfd);
333 readfd = -1;
334
335 if (writefd != -1)
336 ::close(writefd);
337 writefd = -1;
338
339 if (child)
340 while (waitpid(child,0,0) == -1 && errno == EINTR) ;
341 child = 0;
342#endif
343}
344
345Netxx::socket_type
346Netxx::PipeStream::get_socketfd (void) const
347{
348#ifdef WIN32
349 return (Netxx::socket_type) named_pipe;
350#else
351 return Netxx::socket_type(-1);
352#endif
353}
354
355const Netxx::ProbeInfo*
356Netxx::PipeStream::get_probe_info (void) const
357{
358 return 0;
359}
360
361#ifdef WIN32
362
363static string
364status_name(DWORD wstatus)
365{
366 switch (wstatus) {
367 case WAIT_TIMEOUT: return "WAIT_TIMEOUT";
368 case WAIT_OBJECT_0: return "WAIT_OBJECT_0";
369 case WAIT_FAILED: return "WAIT_FAILED";
370 case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1";
371 default: return "UNKNOWN";
372 }
373}
374
375Netxx::Probe::result_type
376Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
377{
378 if (!is_pipe)
379 return Probe::ready(timeout, rt);
380
381 // L(F("mtn %d: checking for i/o ready state") % GetCurrentProcessId());
382
383 if (rt == ready_none)
384 rt = ready_t; // remembered from add
385
386 if (rt & ready_write)
387 {
388 return make_pair(pipe->get_socketfd(), ready_write);
389 }
390
391 if (rt & ready_read)
392 {
393 if (pipe->bytes_available == 0)
394 {
395 // Issue an async request to fill our buffer.
396 BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf,
397 sizeof(pipe->readbuf), NULL, &pipe->overlap);
398 E(ok || GetLastError() == ERROR_IO_PENDING,
399 F("ReadFile call failed: %s") % err_msg());
400 pipe->read_in_progress = true;
401 }
402
403 if (pipe->read_in_progress)
404 {
405 I(pipe->bytes_available == 0);
406
407 // Attempt to wait for the completion of the read-in-progress.
408
409 int milliseconds = ((timeout.get_sec() * 1000)
410 + (timeout.get_usec() / 1000));
411
412 L(FL("WaitForSingleObject(,%d)") % milliseconds);
413
414 DWORD wstatus = WAIT_FAILED;
415
416 if (pipe->child != INVALID_HANDLE_VALUE)
417 {
418
419 // We're a server; we're going to wait for the client to
420 // exit as well as the pipe read status, because
421 // apparently you don't find out about closed pipes
422 // during an overlapped read request (?)
423
424 HANDLE handles[2];
425 handles[0] = pipe->overlap.hEvent;
426 handles[1] = pipe->child;
427
428 wstatus = WaitForMultipleObjects(2,
429 handles,
430 FALSE,
431 milliseconds);
432
433 E(wstatus != WAIT_FAILED,
434 F("WaitForMultipleObjects call failed: %s") % err_msg());
435
436 if (wstatus == WAIT_OBJECT_0 + 1)
437 return make_pair(pipe->get_socketfd(), ready_oobd);
438 }
439 else
440 {
441 wstatus = WaitForSingleObject(pipe->overlap.hEvent,
442 milliseconds);
443 E(wstatus != WAIT_FAILED,
444 F("WaitForSingleObject call failed: %s") % err_msg());
445 }
446
447 if (wstatus == WAIT_TIMEOUT)
448 return make_pair(-1, ready_none);
449
450 BOOL ok = GetOverlappedResult(pipe->named_pipe,
451 &pipe->overlap,
452 &pipe->bytes_available,
453 FALSE);
454
455 if (ok)
456 {
457 // We completed our read.
458 pipe->read_in_progress = false;
459 }
460 else
461 {
462 // We did not complete our read.
463 E(GetLastError() == ERROR_IO_INCOMPLETE,
464 F("GetOverlappedResult call failed: %s")
465 % err_msg());
466 }
467 }
468
469 if (pipe->bytes_available != 0)
470 {
471 return make_pair(pipe->get_socketfd(), ready_read);
472 }
473 }
474
475 return make_pair(pipe->get_socketfd(), ready_none);
476}
477
478void
479Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
480{
481 assert(!is_pipe);
482 assert(!pipe);
483 is_pipe = true;
484 pipe = &ps;
485 ready_t = rt;
486}
487
488void
489Netxx::PipeCompatibleProbe::add(StreamBase const &sb, ready_type rt)
490{
491 // FIXME: This is *still* an unfortunate way of performing a
492 // downcast, though slightly less awful than the old way, which
493 // involved throwing an exception.
494 //
495 // Perhaps we should twiddle the caller-visible API.
496
497 StreamBase const *sbp = &sb;
498 PipeStream const *psp = dynamic_cast<PipeStream const *>(sbp);
499 if (psp)
500 add(const_cast<PipeStream&>(*psp),rt);
501 else
502 {
503 assert(!is_pipe);
504 Probe::add(sb,rt);
505 }
506}
507
508void
509Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
510{
511 assert(!is_pipe);
512 Probe::add(ss,rt);
513}
514#else // unix
515void
516Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
517 {
518 if (rt == ready_none || rt & ready_read)
519 add_socket(ps.get_readfd(), ready_read);
520 if (rt == ready_none || rt & ready_write)
521 add_socket(ps.get_writefd(), ready_write);
522 }
523
524void
525Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
526{
527 try
528 {
529 add(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)),rt);
530 }
531 catch (...)
532 {
533 Probe::add(sb,rt);
534 }
535}
536
537void
538Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
539{
540 Probe::add(ss,rt);
541}
542#endif
543
544#ifdef BUILD_UNIT_TESTS
545#include "unit_tests.hh"
546
547UNIT_TEST(pipe, simple_pipe)
548{ try
549 {
550 Netxx::PipeStream pipe("cat",vector<string>());
551
552 string result;
553 Netxx::PipeCompatibleProbe probe;
554 Netxx::Timeout timeout(2L), short_time(0,1000);
555
556 // time out because no data is available
557 probe.clear();
558 probe.add(pipe, Netxx::Probe::ready_read);
559 Netxx::Probe::result_type res = probe.ready(short_time);
560 I(res.second==Netxx::Probe::ready_none);
561
562 // write should be possible
563 probe.clear();
564 probe.add(pipe, Netxx::Probe::ready_write);
565 res = probe.ready(short_time);
566 I(res.second & Netxx::Probe::ready_write);
567#ifdef WIN32
568 I(res.first==pipe.get_socketfd());
569#else
570 I(res.first==pipe.get_writefd());
571#endif
572
573 // try binary transparency
574 for (int c = 0; c < 256; ++c)
575 {
576 char buf[1024];
577 buf[0] = c;
578 buf[1] = 255 - c;
579 pipe.write(buf, 2);
580
581 string result;
582 while (result.size() < 2)
583 { // wait for data to arrive
584 probe.clear();
585 probe.add(pipe, Netxx::Probe::ready_read);
586 res = probe.ready(timeout);
587 E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c);
588#ifdef WIN32
589 I(res.first == pipe.get_socketfd());
590#else
591 I(res.first == pipe.get_readfd());
592#endif
593 int bytes = pipe.read(buf, sizeof(buf));
594 result += string(buf, bytes);
595 }
596 I(result.size() == 2);
597 I(static_cast<unsigned char>(result[0]) == c);
598 I(static_cast<unsigned char>(result[1]) == 255 - c);
599 }
600
601 pipe.close();
602
603 }
604catch (informative_failure &e)
605 // for some reason boost does not provide
606 // enough information
607 {
608 W(F("Failure %s") % e.what());
609 throw;
610 }
611}
612#endif
613
614// Local Variables:
615// mode: C++
616// fill-column: 76
617// c-file-style: "gnu"
618// indent-tabs-mode: nil
619// End:
620// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status