monotone

monotone Mtn Source Tree

Root/netxx_pipe.cc

1// Copyright (C) 2005 Christof Petig <christof@petig-baender.de>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include <netxx_pipe.hh>
11#include "sanity.hh"
12#include "platform.hh"
13#include <netxx/streamserver.h>
14
15#ifdef WIN32
16#include <windows.h>
17#include <io.h>
18#include <fcntl.h>
19#else
20#include <unistd.h>
21#include <fcntl.h>
22#include <sys/wait.h>
23#include <errno.h>
24#endif
25
26using std::vector;
27using std::string;
28using std::make_pair;
29using std::exit;
30using std::perror;
31using std::strerror;
32
33Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
34 :
35#ifdef WIN32
36 child(INVALID_HANDLE_VALUE),
37 bytes_available(0),
38 read_in_progress(false)
39#else
40 readfd(_readfd),
41 writefd(_writefd),
42 child(0)
43#endif
44{
45#ifdef WIN32
46 if (_setmode(_readfd, _O_BINARY) == -1)
47 L(FL("failed to set input file descriptor to binary"));
48
49 if (_setmode(_writefd, _O_BINARY) == -1)
50 L(FL("failed to set output file descriptor to binary"));
51
52 named_pipe = (HANDLE)_get_osfhandle(_readfd);
53
54 E(named_pipe != INVALID_HANDLE_VALUE,
55 F("pipe handle is invalid"));
56
57 // Create infrastructure for overlapping I/O
58 memset(&overlap, 0, sizeof(overlap));
59 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
60 bytes_available = 0;
61 I(overlap.hEvent != 0);
62#else
63 int flags = fcntl(readfd, F_GETFL, 0);
64 I(fcntl(readfd, F_SETFL, flags | O_NONBLOCK) != -1);
65 flags = fcntl(writefd, F_GETFL, 0);
66 I(fcntl(writefd, F_SETFL, flags | O_NONBLOCK) != -1);
67#endif
68}
69
70
71#ifndef WIN32
72
73// Create pipes for stdio and fork subprocess, returns -1 on error, 0
74// to child and PID to parent.
75
76static pid_t
77pipe_and_fork(int fd1[2], int fd2[2])
78{
79 pid_t result = -1;
80 fd1[0] = -1;
81 fd1[1] = -1;
82 fd2[0] = -1;
83 fd2[1] = -1;
84
85 if (pipe(fd1))
86 return -1;
87
88 if (pipe(fd2))
89 {
90 close(fd1[0]);
91 close(fd1[1]);
92 return -1;
93 }
94
95 result = fork();
96
97 if (result < 0)
98 {
99 close(fd1[0]);
100 close(fd1[1]);
101 close(fd2[0]);
102 close(fd2[1]);
103 return -1;
104 }
105
106 else if (!result)
107 {
108 // fd1[1] for writing, fd2[0] for reading
109 close(fd1[0]);
110 close(fd2[1]);
111 if (dup2(fd2[0], 0) != 0 ||
112 dup2(fd1[1], 1) != 1)
113 {
114 perror("dup2");
115 exit(-1); // kill the useless child
116 }
117 close(fd1[1]);
118 close(fd2[0]);
119 }
120
121 else
122 {
123 // fd1[0] for reading, fd2[1] for writing
124 close(fd1[1]);
125 close(fd2[0]);
126 }
127
128 return result;
129}
130#endif
131
132#ifdef WIN32
133static string
134err_msg()
135{
136 char buf[1024];
137 I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
138 NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
139 (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0);
140 return string(buf);
141}
142#endif
143
144
145Netxx::PipeStream::PipeStream (const string & cmd,
146 const vector<string> & args)
147 :
148#ifdef WIN32
149 child(INVALID_HANDLE_VALUE),
150 bytes_available(0),
151 read_in_progress(false)
152#else
153 readfd(-1),
154 writefd(-1),
155 child(0)
156#endif
157{
158 // Unfortunately neither munge_argv_into_cmdline nor execvp do take
159 // a vector<string> as argument.
160
161 const unsigned newsize = 64;
162 const char *newargv[newsize];
163 I(args.size() < (sizeof(newargv) / sizeof(newargv[0])));
164
165 unsigned newargc = 0;
166 newargv[newargc++]=cmd.c_str();
167 for (vector<string>::const_iterator i = args.begin();
168 i != args.end(); ++i)
169 newargv[newargc++] = i->c_str();
170 newargv[newargc] = 0;
171
172#ifdef WIN32
173
174 // In order to use nonblocking i/o on windows, you must use named
175 // pipes and overlapped i/o. There is no other way, alas.
176
177 static unsigned long serial = 0;
178 string pipename = (F("\\\\.\\pipe\\netxx_pipe_%ld_%d")
179 % GetCurrentProcessId()
180 % (++serial)).str();
181
182 // Create the parent's handle to the named pipe.
183
184 named_pipe = CreateNamedPipe(pipename.c_str(),
185 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
186 PIPE_TYPE_BYTE | PIPE_WAIT,
187 1,
188 sizeof(readbuf),
189 sizeof(readbuf),
190 1000,
191 0);
192
193 E(named_pipe != INVALID_HANDLE_VALUE,
194 F("CreateNamedPipe(%s,...) call failed: %s")
195 % pipename % err_msg());
196
197 // Open the child's handle to the named pipe.
198
199 SECURITY_ATTRIBUTES inherit;
200 memset(&inherit,0,sizeof inherit);
201 inherit.nLength=sizeof inherit;
202 inherit.bInheritHandle = TRUE;
203
204 HANDLE hpipe = CreateFile(pipename.c_str(),
205 GENERIC_READ|GENERIC_WRITE, 0,
206 &inherit,
207 OPEN_EXISTING,
208 FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
209
210 E(hpipe != INVALID_HANDLE_VALUE,
211 F("CreateFile(%s,...) call failed: %s")
212 % pipename % err_msg());
213
214 // Set up the child with the pipes as stdin/stdout and inheriting stderr.
215
216 PROCESS_INFORMATION piProcInfo;
217 STARTUPINFO siStartInfo;
218
219 memset(&piProcInfo, 0, sizeof(piProcInfo));
220 memset(&siStartInfo, 0, sizeof(siStartInfo));
221
222 siStartInfo.cb = sizeof(siStartInfo);
223 siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
224 siStartInfo.hStdOutput = hpipe;
225 siStartInfo.hStdInput = hpipe;
226 siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
227
228 string cmdline = munge_argv_into_cmdline(newargv);
229 L(FL("Subprocess command line: '%s'") % cmdline);
230
231 BOOL started = CreateProcess(NULL, // Application name
232 const_cast<CHAR*>(cmdline.c_str()),
233 NULL, // Process attributes
234 NULL, // Thread attributes
235 TRUE, // Inherit handles
236 0, // Creation flags
237 NULL, // Environment
238 NULL, // Current directory
239 &siStartInfo,
240 &piProcInfo);
241 E(started,
242 F("CreateProcess(%s,...) call failed: %s")
243 % cmdline % err_msg());
244
245 child = piProcInfo.hProcess;
246
247 // create infrastructure for overlapping I/O
248
249 memset(&overlap, 0, sizeof(overlap));
250 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
251 bytes_available = 0;
252 I(overlap.hEvent != 0);
253
254#else // !WIN32
255
256 int fd1[2], fd2[2];
257 child = pipe_and_fork(fd1, fd2);
258 E(child >= 0, F("pipe/fork failed %s") % strerror(errno));
259 if (!child)
260 {
261 execvp(newargv[0], const_cast<char * const *>(newargv));
262 perror(newargv[0]);
263 exit(errno);
264 }
265 readfd = fd1[0];
266 writefd = fd2[1];
267 fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK);
268#endif
269
270 // P(F("mtn %d: set up i/o channels")
271 // % GetCurrentProcessId());
272}
273
274// Non blocking read.
275
276Netxx::signed_size_type
277Netxx::PipeStream::read (void *buffer, size_type length)
278{
279#ifdef WIN32
280
281 if (length > bytes_available)
282 length = bytes_available;
283
284 if (length)
285 {
286 memcpy(buffer, readbuf, length);
287 if (length < bytes_available)
288 memmove(readbuf, readbuf+length, bytes_available-length);
289 bytes_available -= length;
290 }
291
292 return length;
293#else
294 return ::read(readfd, buffer, length);
295#endif
296}
297
298Netxx::signed_size_type
299Netxx::PipeStream::write(const void *buffer, size_type length)
300{
301#ifdef WIN32
302 DWORD written = 0;
303 BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL);
304 E(ok, F("WriteFile call failed: %s") % err_msg());
305#else
306 size_t written = ::write(writefd, buffer, length);
307#endif
308 return written;
309}
310
311void
312Netxx::PipeStream::close (void)
313{
314
315#ifdef WIN32
316 if (named_pipe != INVALID_HANDLE_VALUE)
317 CloseHandle(named_pipe);
318 named_pipe = INVALID_HANDLE_VALUE;
319
320 if (overlap.hEvent != INVALID_HANDLE_VALUE)
321 CloseHandle(overlap.hEvent);
322 overlap.hEvent = INVALID_HANDLE_VALUE;
323
324 if (child != INVALID_HANDLE_VALUE)
325 WaitForSingleObject(child, INFINITE);
326 child = INVALID_HANDLE_VALUE;
327#else
328 if (readfd != -1)
329 ::close(readfd);
330 readfd = -1;
331
332 if (writefd != -1)
333 ::close(writefd);
334 writefd = -1;
335
336 if (child)
337 waitpid(child,0,0);
338 child = 0;
339#endif
340}
341
342Netxx::socket_type
343Netxx::PipeStream::get_socketfd (void) const
344{
345#ifdef WIN32
346 return (Netxx::socket_type) named_pipe;
347#else
348 return Netxx::socket_type(-1);
349#endif
350}
351
352const Netxx::ProbeInfo*
353Netxx::PipeStream::get_probe_info (void) const
354{
355 return 0;
356}
357
358#ifdef WIN32
359
360static string
361status_name(DWORD wstatus)
362{
363 switch (wstatus) {
364 case WAIT_TIMEOUT: return "WAIT_TIMEOUT";
365 case WAIT_OBJECT_0: return "WAIT_OBJECT_0";
366 case WAIT_FAILED: return "WAIT_FAILED";
367 case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1";
368 default: return "UNKNOWN";
369 }
370}
371
372Netxx::Probe::result_type
373Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
374{
375 if (!is_pipe)
376 return Probe::ready(timeout, rt);
377
378 // L(F("mtn %d: checking for i/o ready state") % GetCurrentProcessId());
379
380 if (rt == ready_none)
381 rt = ready_t; // remembered from add
382
383 if (rt & ready_write)
384 {
385 return make_pair(pipe->get_socketfd(), ready_write);
386 }
387
388 if (rt & ready_read)
389 {
390 if (pipe->bytes_available == 0)
391 {
392 // Issue an async request to fill our buffer.
393 BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf,
394 sizeof(pipe->readbuf), NULL, &pipe->overlap);
395 E(ok || GetLastError() == ERROR_IO_PENDING,
396 F("ReadFile call failed: %s") % err_msg());
397 pipe->read_in_progress = true;
398 }
399
400 if (pipe->read_in_progress)
401 {
402 I(pipe->bytes_available == 0);
403
404 // Attempt to wait for the completion of the read-in-progress.
405
406 int milliseconds = ((timeout.get_sec() * 1000)
407 + (timeout.get_usec() / 1000));
408
409 L(FL("WaitForSingleObject(,%d)") % milliseconds);
410
411 DWORD wstatus = WAIT_FAILED;
412
413 if (pipe->child != INVALID_HANDLE_VALUE)
414 {
415
416 // We're a server; we're going to wait for the client to
417 // exit as well as the pipe read status, because
418 // apparently you don't find out about closed pipes
419 // during an overlapped read request (?)
420
421 HANDLE handles[2];
422 handles[0] = pipe->overlap.hEvent;
423 handles[1] = pipe->child;
424
425 wstatus = WaitForMultipleObjects(2,
426 handles,
427 FALSE,
428 milliseconds);
429
430 E(wstatus != WAIT_FAILED,
431 F("WaitForMultipleObjects call failed: %s") % err_msg());
432
433 if (wstatus == WAIT_OBJECT_0 + 1)
434 return make_pair(pipe->get_socketfd(), ready_oobd);
435 }
436 else
437 {
438 wstatus = WaitForSingleObject(pipe->overlap.hEvent,
439 milliseconds);
440 E(wstatus != WAIT_FAILED,
441 F("WaitForSingleObject call failed: %s") % err_msg());
442 }
443
444 if (wstatus == WAIT_TIMEOUT)
445 return make_pair(-1, ready_none);
446
447 BOOL ok = GetOverlappedResult(pipe->named_pipe,
448 &pipe->overlap,
449 &pipe->bytes_available,
450 FALSE);
451
452 if (ok)
453 {
454 // We completed our read.
455 pipe->read_in_progress = false;
456 }
457 else
458 {
459 // We did not complete our read.
460 E(GetLastError() == ERROR_IO_INCOMPLETE,
461 F("GetOverlappedResult call failed: %s")
462 % err_msg());
463 }
464 }
465
466 if (pipe->bytes_available != 0)
467 {
468 return make_pair(pipe->get_socketfd(), ready_read);
469 }
470 }
471
472 return make_pair(pipe->get_socketfd(), ready_none);
473}
474
475void
476Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
477{
478 assert(!is_pipe);
479 assert(!pipe);
480 is_pipe = true;
481 pipe = &ps;
482 ready_t = rt;
483}
484
485void
486Netxx::PipeCompatibleProbe::add(StreamBase const &sb, ready_type rt)
487{
488 // FIXME: This is *still* an unfortunate way of performing a
489 // downcast, though slightly less awful than the old way, which
490 // involved throwing an exception.
491 //
492 // Perhaps we should twiddle the caller-visible API.
493
494 StreamBase const *sbp = &sb;
495 PipeStream const *psp = dynamic_cast<PipeStream const *>(sbp);
496 if (psp)
497 add(const_cast<PipeStream&>(*psp),rt);
498 else
499 {
500 assert(!is_pipe);
501 Probe::add(sb,rt);
502 }
503}
504
505void
506Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
507{
508 assert(!is_pipe);
509 Probe::add(ss,rt);
510}
511#else // unix
512void
513Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
514 {
515 if (rt == ready_none || rt & ready_read)
516 add_socket(ps.get_readfd(), ready_read);
517 if (rt == ready_none || rt & ready_write)
518 add_socket(ps.get_writefd(), ready_write);
519 }
520
521void
522Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
523{
524 try
525 {
526 add(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)),rt);
527 }
528 catch (...)
529 {
530 Probe::add(sb,rt);
531 }
532}
533
534void
535Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
536{
537 Probe::add(ss,rt);
538}
539#endif
540
541#ifdef BUILD_UNIT_TESTS
542#include "unit_tests.hh"
543
544UNIT_TEST(pipe, simple_pipe)
545{ try
546 {
547 Netxx::PipeStream pipe("cat",vector<string>());
548
549 string result;
550 Netxx::PipeCompatibleProbe probe;
551 Netxx::Timeout timeout(2L), short_time(0,1000);
552
553 // time out because no data is available
554 probe.clear();
555 probe.add(pipe, Netxx::Probe::ready_read);
556 Netxx::Probe::result_type res = probe.ready(short_time);
557 I(res.second==Netxx::Probe::ready_none);
558
559 // write should be possible
560 probe.clear();
561 probe.add(pipe, Netxx::Probe::ready_write);
562 res = probe.ready(short_time);
563 I(res.second & Netxx::Probe::ready_write);
564#ifdef WIN32
565 I(res.first==pipe.get_socketfd());
566#else
567 I(res.first==pipe.get_writefd());
568#endif
569
570 // try binary transparency
571 for (int c = 0; c < 256; ++c)
572 {
573 char buf[1024];
574 buf[0] = c;
575 buf[1] = 255 - c;
576 pipe.write(buf, 2);
577
578 string result;
579 while (result.size() < 2)
580 { // wait for data to arrive
581 probe.clear();
582 probe.add(pipe, Netxx::Probe::ready_read);
583 res = probe.ready(timeout);
584 E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c);
585#ifdef WIN32
586 I(res.first == pipe.get_socketfd());
587#else
588 I(res.first == pipe.get_readfd());
589#endif
590 int bytes = pipe.read(buf, sizeof(buf));
591 result += string(buf, bytes);
592 }
593 I(result.size() == 2);
594 I(static_cast<unsigned char>(result[0]) == c);
595 I(static_cast<unsigned char>(result[1]) == 255 - c);
596 }
597
598 pipe.close();
599
600 }
601catch (informative_failure &e)
602 // for some reason boost does not provide
603 // enough information
604 {
605 W(F("Failure %s") % e.what());
606 throw;
607 }
608}
609#endif
610
611// Local Variables:
612// mode: C++
613// fill-column: 76
614// c-file-style: "gnu"
615// indent-tabs-mode: nil
616// End:
617// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status