monotone

monotone Mtn Source Tree

Root/netxx_pipe.cc

1// Copyright (C) 2005 Christof Petig <christof@petig-baender.de>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <netxx_pipe.hh>
12#include "sanity.hh"
13#include "platform.hh"
14#include <netxx/streamserver.h>
15
16#ifdef WIN32
17#include <windows.h>
18#include <io.h>
19#include <fcntl.h>
20#else
21#include <unistd.h>
22#include <fcntl.h>
23#include <sys/wait.h>
24#include <errno.h>
25#endif
26
27using std::vector;
28using std::string;
29using std::make_pair;
30using std::exit;
31using std::perror;
32using std::strerror;
33
34Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
35 :
36#ifdef WIN32
37 child(INVALID_HANDLE_VALUE),
38 bytes_available(0),
39 read_in_progress(false)
40#else
41 readfd(_readfd),
42 writefd(_writefd),
43 child(0)
44#endif
45{
46#ifdef WIN32
47 if (_setmode(_readfd, _O_BINARY) == -1)
48 L(FL("failed to set input file descriptor to binary"));
49
50 if (_setmode(_writefd, _O_BINARY) == -1)
51 L(FL("failed to set output file descriptor to binary"));
52
53 named_pipe = (HANDLE)_get_osfhandle(_readfd);
54
55 E(named_pipe != INVALID_HANDLE_VALUE,
56 F("pipe handle is invalid"));
57
58 // Create infrastructure for overlapping I/O
59 memset(&overlap, 0, sizeof(overlap));
60 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
61 bytes_available = 0;
62 I(overlap.hEvent != 0);
63#else
64 int flags = fcntl(readfd, F_GETFL, 0);
65 I(fcntl(readfd, F_SETFL, flags | O_NONBLOCK) != -1);
66 flags = fcntl(writefd, F_GETFL, 0);
67 I(fcntl(writefd, F_SETFL, flags | O_NONBLOCK) != -1);
68#endif
69}
70
71
72#ifndef WIN32
73
74// Create pipes for stdio and fork subprocess, returns -1 on error, 0
75// to child and PID to parent.
76
77static pid_t
78pipe_and_fork(int fd1[2], int fd2[2])
79{
80 pid_t result = -1;
81 fd1[0] = -1;
82 fd1[1] = -1;
83 fd2[0] = -1;
84 fd2[1] = -1;
85
86 if (pipe(fd1))
87 return -1;
88
89 if (pipe(fd2))
90 {
91 close(fd1[0]);
92 close(fd1[1]);
93 return -1;
94 }
95
96 result = fork();
97
98 if (result < 0)
99 {
100 close(fd1[0]);
101 close(fd1[1]);
102 close(fd2[0]);
103 close(fd2[1]);
104 return -1;
105 }
106
107 else if (!result)
108 {
109 // fd1[1] for writing, fd2[0] for reading
110 close(fd1[0]);
111 close(fd2[1]);
112 if (dup2(fd2[0], 0) != 0 ||
113 dup2(fd1[1], 1) != 1)
114 {
115 perror("dup2");
116 exit(-1); // kill the useless child
117 }
118 close(fd1[1]);
119 close(fd2[0]);
120 }
121
122 else
123 {
124 // fd1[0] for reading, fd2[1] for writing
125 close(fd1[1]);
126 close(fd2[0]);
127 }
128
129 return result;
130}
131#endif
132
133#ifdef WIN32
134static string
135err_msg()
136{
137 char buf[1024];
138 I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
139 NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
140 (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0);
141 return string(buf);
142}
143#endif
144
145
146Netxx::PipeStream::PipeStream (const string & cmd,
147 const vector<string> & args)
148 :
149#ifdef WIN32
150 child(INVALID_HANDLE_VALUE),
151 bytes_available(0),
152 read_in_progress(false)
153#else
154 readfd(-1),
155 writefd(-1),
156 child(0)
157#endif
158{
159 // Unfortunately neither munge_argv_into_cmdline nor execvp do take
160 // a vector<string> as argument.
161
162 const unsigned newsize = 64;
163 const char *newargv[newsize];
164 I(args.size() < (sizeof(newargv) / sizeof(newargv[0])));
165
166 unsigned newargc = 0;
167 newargv[newargc++]=cmd.c_str();
168 for (vector<string>::const_iterator i = args.begin();
169 i != args.end(); ++i)
170 newargv[newargc++] = i->c_str();
171 newargv[newargc] = 0;
172
173#ifdef WIN32
174
175 // In order to use nonblocking i/o on windows, you must use named
176 // pipes and overlapped i/o. There is no other way, alas.
177
178 static unsigned long serial = 0;
179 string pipename = (F("\\\\.\\pipe\\netxx_pipe_%ld_%d")
180 % GetCurrentProcessId()
181 % (++serial)).str();
182
183 // Create the parent's handle to the named pipe.
184
185 named_pipe = CreateNamedPipe(pipename.c_str(),
186 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
187 PIPE_TYPE_BYTE | PIPE_WAIT,
188 1,
189 sizeof(readbuf),
190 sizeof(readbuf),
191 1000,
192 0);
193
194 E(named_pipe != INVALID_HANDLE_VALUE,
195 F("CreateNamedPipe(%s,...) call failed: %s")
196 % pipename % err_msg());
197
198 // Open the child's handle to the named pipe.
199
200 SECURITY_ATTRIBUTES inherit;
201 memset(&inherit,0,sizeof inherit);
202 inherit.nLength=sizeof inherit;
203 inherit.bInheritHandle = TRUE;
204
205 HANDLE hpipe = CreateFile(pipename.c_str(),
206 GENERIC_READ|GENERIC_WRITE, 0,
207 &inherit,
208 OPEN_EXISTING,
209 FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
210
211 E(hpipe != INVALID_HANDLE_VALUE,
212 F("CreateFile(%s,...) call failed: %s")
213 % pipename % err_msg());
214
215 // Set up the child with the pipes as stdin/stdout and inheriting stderr.
216
217 PROCESS_INFORMATION piProcInfo;
218 STARTUPINFO siStartInfo;
219
220 memset(&piProcInfo, 0, sizeof(piProcInfo));
221 memset(&siStartInfo, 0, sizeof(siStartInfo));
222
223 siStartInfo.cb = sizeof(siStartInfo);
224 siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
225 siStartInfo.hStdOutput = hpipe;
226 siStartInfo.hStdInput = hpipe;
227 siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
228
229 string cmdline = munge_argv_into_cmdline(newargv);
230 L(FL("Subprocess command line: '%s'") % cmdline);
231
232 BOOL started = CreateProcess(NULL, // Application name
233 const_cast<CHAR*>(cmdline.c_str()),
234 NULL, // Process attributes
235 NULL, // Thread attributes
236 TRUE, // Inherit handles
237 0, // Creation flags
238 NULL, // Environment
239 NULL, // Current directory
240 &siStartInfo,
241 &piProcInfo);
242 E(started,
243 F("CreateProcess(%s,...) call failed: %s")
244 % cmdline % err_msg());
245
246 child = piProcInfo.hProcess;
247
248 // create infrastructure for overlapping I/O
249
250 memset(&overlap, 0, sizeof(overlap));
251 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
252 bytes_available = 0;
253 I(overlap.hEvent != 0);
254
255#else // !WIN32
256
257 int fd1[2], fd2[2];
258 child = pipe_and_fork(fd1, fd2);
259 E(child >= 0, F("pipe/fork failed %s") % strerror(errno));
260 if (!child)
261 {
262 execvp(newargv[0], const_cast<char * const *>(newargv));
263 perror(newargv[0]);
264 exit(errno);
265 }
266 readfd = fd1[0];
267 writefd = fd2[1];
268 fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK);
269#endif
270
271 // P(F("mtn %d: set up i/o channels")
272 // % GetCurrentProcessId());
273}
274
275// Non blocking read.
276
277Netxx::signed_size_type
278Netxx::PipeStream::read (void *buffer, size_type length)
279{
280#ifdef WIN32
281
282 if (length > bytes_available)
283 length = bytes_available;
284
285 if (length)
286 {
287 memcpy(buffer, readbuf, length);
288 if (length < bytes_available)
289 memmove(readbuf, readbuf+length, bytes_available-length);
290 bytes_available -= length;
291 }
292
293 return length;
294#else
295 return ::read(readfd, buffer, length);
296#endif
297}
298
299Netxx::signed_size_type
300Netxx::PipeStream::write(const void *buffer, size_type length)
301{
302#ifdef WIN32
303 DWORD written = 0;
304 BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL);
305 E(ok, F("WriteFile call failed: %s") % err_msg());
306#else
307 size_t written = ::write(writefd, buffer, length);
308#endif
309 return written;
310}
311
312void
313Netxx::PipeStream::close (void)
314{
315
316#ifdef WIN32
317 if (named_pipe != INVALID_HANDLE_VALUE)
318 CloseHandle(named_pipe);
319 named_pipe = INVALID_HANDLE_VALUE;
320
321 if (overlap.hEvent != INVALID_HANDLE_VALUE)
322 CloseHandle(overlap.hEvent);
323 overlap.hEvent = INVALID_HANDLE_VALUE;
324
325 if (child != INVALID_HANDLE_VALUE)
326 WaitForSingleObject(child, INFINITE);
327 child = INVALID_HANDLE_VALUE;
328#else
329 if (readfd != -1)
330 ::close(readfd);
331 readfd = -1;
332
333 if (writefd != -1)
334 ::close(writefd);
335 writefd = -1;
336
337 if (child)
338 waitpid(child,0,0);
339 child = 0;
340#endif
341}
342
343Netxx::socket_type
344Netxx::PipeStream::get_socketfd (void) const
345{
346#ifdef WIN32
347 return (Netxx::socket_type) named_pipe;
348#else
349 return Netxx::socket_type(-1);
350#endif
351}
352
353const Netxx::ProbeInfo*
354Netxx::PipeStream::get_probe_info (void) const
355{
356 return 0;
357}
358
359#ifdef WIN32
360
361static string
362status_name(DWORD wstatus)
363{
364 switch (wstatus) {
365 case WAIT_TIMEOUT: return "WAIT_TIMEOUT";
366 case WAIT_OBJECT_0: return "WAIT_OBJECT_0";
367 case WAIT_FAILED: return "WAIT_FAILED";
368 case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1";
369 default: return "UNKNOWN";
370 }
371}
372
373Netxx::Probe::result_type
374Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
375{
376 if (!is_pipe)
377 return Probe::ready(timeout, rt);
378
379 // L(F("mtn %d: checking for i/o ready state") % GetCurrentProcessId());
380
381 if (rt == ready_none)
382 rt = ready_t; // remembered from add
383
384 if (rt & ready_write)
385 {
386 return make_pair(pipe->get_socketfd(), ready_write);
387 }
388
389 if (rt & ready_read)
390 {
391 if (pipe->bytes_available == 0)
392 {
393 // Issue an async request to fill our buffer.
394 BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf,
395 sizeof(pipe->readbuf), NULL, &pipe->overlap);
396 E(ok || GetLastError() == ERROR_IO_PENDING,
397 F("ReadFile call failed: %s") % err_msg());
398 pipe->read_in_progress = true;
399 }
400
401 if (pipe->read_in_progress)
402 {
403 I(pipe->bytes_available == 0);
404
405 // Attempt to wait for the completion of the read-in-progress.
406
407 int milliseconds = ((timeout.get_sec() * 1000)
408 + (timeout.get_usec() / 1000));
409
410 L(FL("WaitForSingleObject(,%d)") % milliseconds);
411
412 DWORD wstatus = WAIT_FAILED;
413
414 if (pipe->child != INVALID_HANDLE_VALUE)
415 {
416
417 // We're a server; we're going to wait for the client to
418 // exit as well as the pipe read status, because
419 // apparently you don't find out about closed pipes
420 // during an overlapped read request (?)
421
422 HANDLE handles[2];
423 handles[0] = pipe->overlap.hEvent;
424 handles[1] = pipe->child;
425
426 wstatus = WaitForMultipleObjects(2,
427 handles,
428 FALSE,
429 milliseconds);
430
431 E(wstatus != WAIT_FAILED,
432 F("WaitForMultipleObjects call failed: %s") % err_msg());
433
434 if (wstatus == WAIT_OBJECT_0 + 1)
435 return make_pair(pipe->get_socketfd(), ready_oobd);
436 }
437 else
438 {
439 wstatus = WaitForSingleObject(pipe->overlap.hEvent,
440 milliseconds);
441 E(wstatus != WAIT_FAILED,
442 F("WaitForSingleObject call failed: %s") % err_msg());
443 }
444
445 if (wstatus == WAIT_TIMEOUT)
446 return make_pair(-1, ready_none);
447
448 BOOL ok = GetOverlappedResult(pipe->named_pipe,
449 &pipe->overlap,
450 &pipe->bytes_available,
451 FALSE);
452
453 if (ok)
454 {
455 // We completed our read.
456 pipe->read_in_progress = false;
457 }
458 else
459 {
460 // We did not complete our read.
461 E(GetLastError() == ERROR_IO_INCOMPLETE,
462 F("GetOverlappedResult call failed: %s")
463 % err_msg());
464 }
465 }
466
467 if (pipe->bytes_available != 0)
468 {
469 return make_pair(pipe->get_socketfd(), ready_read);
470 }
471 }
472
473 return make_pair(pipe->get_socketfd(), ready_none);
474}
475
476void
477Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
478{
479 assert(!is_pipe);
480 assert(!pipe);
481 is_pipe = true;
482 pipe = &ps;
483 ready_t = rt;
484}
485
486void
487Netxx::PipeCompatibleProbe::add(StreamBase const &sb, ready_type rt)
488{
489 // FIXME: This is *still* an unfortunate way of performing a
490 // downcast, though slightly less awful than the old way, which
491 // involved throwing an exception.
492 //
493 // Perhaps we should twiddle the caller-visible API.
494
495 StreamBase const *sbp = &sb;
496 PipeStream const *psp = dynamic_cast<PipeStream const *>(sbp);
497 if (psp)
498 add(const_cast<PipeStream&>(*psp),rt);
499 else
500 {
501 assert(!is_pipe);
502 Probe::add(sb,rt);
503 }
504}
505
506void
507Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
508{
509 assert(!is_pipe);
510 Probe::add(ss,rt);
511}
512#else // unix
513void
514Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
515 {
516 if (rt == ready_none || rt & ready_read)
517 add_socket(ps.get_readfd(), ready_read);
518 if (rt == ready_none || rt & ready_write)
519 add_socket(ps.get_writefd(), ready_write);
520 }
521
522void
523Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
524{
525 try
526 {
527 add(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)),rt);
528 }
529 catch (...)
530 {
531 Probe::add(sb,rt);
532 }
533}
534
535void
536Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
537{
538 Probe::add(ss,rt);
539}
540#endif
541
542#ifdef BUILD_UNIT_TESTS
543#include "unit_tests.hh"
544
545UNIT_TEST(pipe, simple_pipe)
546{ try
547 {
548 Netxx::PipeStream pipe("cat",vector<string>());
549
550 string result;
551 Netxx::PipeCompatibleProbe probe;
552 Netxx::Timeout timeout(2L), short_time(0,1000);
553
554 // time out because no data is available
555 probe.clear();
556 probe.add(pipe, Netxx::Probe::ready_read);
557 Netxx::Probe::result_type res = probe.ready(short_time);
558 I(res.second==Netxx::Probe::ready_none);
559
560 // write should be possible
561 probe.clear();
562 probe.add(pipe, Netxx::Probe::ready_write);
563 res = probe.ready(short_time);
564 I(res.second & Netxx::Probe::ready_write);
565#ifdef WIN32
566 I(res.first==pipe.get_socketfd());
567#else
568 I(res.first==pipe.get_writefd());
569#endif
570
571 // try binary transparency
572 for (int c = 0; c < 256; ++c)
573 {
574 char buf[1024];
575 buf[0] = c;
576 buf[1] = 255 - c;
577 pipe.write(buf, 2);
578
579 string result;
580 while (result.size() < 2)
581 { // wait for data to arrive
582 probe.clear();
583 probe.add(pipe, Netxx::Probe::ready_read);
584 res = probe.ready(timeout);
585 E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c);
586#ifdef WIN32
587 I(res.first == pipe.get_socketfd());
588#else
589 I(res.first == pipe.get_readfd());
590#endif
591 int bytes = pipe.read(buf, sizeof(buf));
592 result += string(buf, bytes);
593 }
594 I(result.size() == 2);
595 I(static_cast<unsigned char>(result[0]) == c);
596 I(static_cast<unsigned char>(result[1]) == 255 - c);
597 }
598
599 pipe.close();
600
601 }
602catch (informative_failure &e)
603 // for some reason boost does not provide
604 // enough information
605 {
606 W(F("Failure %s") % e.what());
607 throw;
608 }
609}
610#endif
611
612// Local Variables:
613// mode: C++
614// fill-column: 76
615// c-file-style: "gnu"
616// indent-tabs-mode: nil
617// End:
618// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status