monotone

monotone Mtn Source Tree

Root/netxx_pipe.cc

1// Copyright (C) 2005 Christof Petig <christof@petig-baender.de>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <netxx_pipe.hh>
12#include "sanity.hh"
13#include "platform.hh"
14#include <netxx/streamserver.h>
15#include <ostream> // for operator<<
16
17#ifdef WIN32
18#include <windows.h>
19#include <io.h>
20#include <fcntl.h>
21#else
22#include <unistd.h>
23#include <fcntl.h>
24#include <sys/wait.h>
25#include <errno.h>
26#include <cstring> // strerror
27#endif
28
29using std::vector;
30using std::string;
31using std::make_pair;
32using std::exit;
33using std::perror;
34using std::strerror;
35
36Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
37 :
38#ifdef WIN32
39 child(INVALID_HANDLE_VALUE),
40 bytes_available(0),
41 read_in_progress(false)
42#else
43 readfd(_readfd),
44 writefd(_writefd),
45 child(0)
46#endif
47{
48#ifdef WIN32
49 E(0, F("this transport not supported on native Win32; use Cygwin"));
50
51 // keeping code in case someone wants to try fixing it
52
53 if (_setmode(_readfd, _O_BINARY) == -1)
54 L(FL("failed to set input file descriptor to binary"));
55
56 if (_setmode(_writefd, _O_BINARY) == -1)
57 L(FL("failed to set output file descriptor to binary"));
58
59 named_pipe = (HANDLE)_get_osfhandle(_readfd);
60
61 E(named_pipe != INVALID_HANDLE_VALUE,
62 F("pipe handle is invalid"));
63
64 // Create infrastructure for overlapping I/O
65 memset(&overlap, 0, sizeof(overlap));
66 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
67 bytes_available = 0;
68 I(overlap.hEvent != 0);
69#else
70 int flags = fcntl(readfd, F_GETFL, 0);
71 I(fcntl(readfd, F_SETFL, flags | O_NONBLOCK) != -1);
72 flags = fcntl(writefd, F_GETFL, 0);
73 I(fcntl(writefd, F_SETFL, flags | O_NONBLOCK) != -1);
74#endif
75}
76
77
78#ifndef WIN32
79
80// Create pipes for stdio and fork subprocess, returns -1 on error, 0
81// to child and PID to parent.
82
83static pid_t
84pipe_and_fork(int fd1[2], int fd2[2])
85{
86 pid_t result = -1;
87 fd1[0] = -1;
88 fd1[1] = -1;
89 fd2[0] = -1;
90 fd2[1] = -1;
91
92 if (pipe(fd1))
93 return -1;
94
95 if (pipe(fd2))
96 {
97 close(fd1[0]);
98 close(fd1[1]);
99 return -1;
100 }
101
102 result = fork();
103
104 if (result < 0)
105 {
106 close(fd1[0]);
107 close(fd1[1]);
108 close(fd2[0]);
109 close(fd2[1]);
110 return -1;
111 }
112
113 else if (!result)
114 {
115 // fd1[1] for writing, fd2[0] for reading
116 close(fd1[0]);
117 close(fd2[1]);
118 if (dup2(fd2[0], 0) != 0 ||
119 dup2(fd1[1], 1) != 1)
120 {
121 perror("dup2");
122 exit(-1); // kill the useless child
123 }
124 close(fd1[1]);
125 close(fd2[0]);
126 }
127
128 else
129 {
130 // fd1[0] for reading, fd2[1] for writing
131 close(fd1[1]);
132 close(fd2[0]);
133 }
134
135 return result;
136}
137#endif
138
139#ifdef WIN32
140static string
141err_msg()
142{
143 char buf[1024];
144 I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
145 NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
146 (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0);
147 return string(buf);
148}
149#endif
150
151
152Netxx::PipeStream::PipeStream (const string & cmd,
153 const vector<string> & args)
154 :
155#ifdef WIN32
156 child(INVALID_HANDLE_VALUE),
157 bytes_available(0),
158 read_in_progress(false)
159#else
160 readfd(-1),
161 writefd(-1),
162 child(0)
163#endif
164{
165 // Unfortunately neither munge_argv_into_cmdline nor execvp do take
166 // a vector<string> as argument.
167
168 const unsigned newsize = 64;
169 const char *newargv[newsize];
170 I(args.size() < (sizeof(newargv) / sizeof(newargv[0])));
171
172 unsigned newargc = 0;
173 newargv[newargc++]=cmd.c_str();
174 for (vector<string>::const_iterator i = args.begin();
175 i != args.end(); ++i)
176 newargv[newargc++] = i->c_str();
177 newargv[newargc] = 0;
178
179#ifdef WIN32
180
181 E(0, F("this transport not supported on native Win32; use Cygwin"));
182
183 // keeping code in case someone wants to try fixing it
184
185 // In order to use nonblocking i/o on windows, you must use named
186 // pipes and overlapped i/o. There is no other way, alas.
187
188 static unsigned long serial = 0;
189 string pipename = (FL("\\\\.\\pipe\\netxx_pipe_%ld_%d")
190 % GetCurrentProcessId()
191 % (++serial)).str();
192
193 // Create the parent's handle to the named pipe.
194
195 named_pipe = CreateNamedPipe(pipename.c_str(),
196 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
197 PIPE_TYPE_BYTE | PIPE_WAIT,
198 1,
199 sizeof(readbuf),
200 sizeof(readbuf),
201 1000,
202 0);
203
204 E(named_pipe != INVALID_HANDLE_VALUE,
205 F("CreateNamedPipe(%s,...) call failed: %s")
206 % pipename % err_msg());
207
208 // Open the child's handle to the named pipe.
209
210 SECURITY_ATTRIBUTES inherit;
211 memset(&inherit,0,sizeof inherit);
212 inherit.nLength=sizeof inherit;
213 inherit.bInheritHandle = TRUE;
214
215 HANDLE hpipe = CreateFile(pipename.c_str(),
216 GENERIC_READ|GENERIC_WRITE, 0,
217 &inherit,
218 OPEN_EXISTING,
219 FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
220
221 E(hpipe != INVALID_HANDLE_VALUE,
222 F("CreateFile(%s,...) call failed: %s")
223 % pipename % err_msg());
224
225 // Set up the child with the pipes as stdin/stdout and inheriting stderr.
226
227 PROCESS_INFORMATION piProcInfo;
228 STARTUPINFO siStartInfo;
229
230 memset(&piProcInfo, 0, sizeof(piProcInfo));
231 memset(&siStartInfo, 0, sizeof(siStartInfo));
232
233 siStartInfo.cb = sizeof(siStartInfo);
234 siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
235 siStartInfo.hStdOutput = hpipe;
236 siStartInfo.hStdInput = hpipe;
237 siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
238
239 string cmdline = munge_argv_into_cmdline(newargv);
240 L(FL("Subprocess command line: '%s'") % cmdline);
241
242 BOOL started = CreateProcess(NULL, // Application name
243 const_cast<CHAR*>(cmdline.c_str()),
244 NULL, // Process attributes
245 NULL, // Thread attributes
246 TRUE, // Inherit handles
247 0, // Creation flags
248 NULL, // Environment
249 NULL, // Current directory
250 &siStartInfo,
251 &piProcInfo);
252 E(started,
253 F("CreateProcess(%s,...) call failed: %s")
254 % cmdline % err_msg());
255
256 child = piProcInfo.hProcess;
257
258 // create infrastructure for overlapping I/O
259
260 memset(&overlap, 0, sizeof(overlap));
261 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
262 bytes_available = 0;
263 I(overlap.hEvent != 0);
264
265#else // !WIN32
266
267 int fd1[2], fd2[2];
268 child = pipe_and_fork(fd1, fd2);
269 E(child >= 0, F("pipe/fork failed: %s") % strerror(errno));
270 if (!child)
271 {
272 execvp(newargv[0], const_cast<char * const *>(newargv));
273 perror(newargv[0]);
274 exit(errno);
275 }
276 readfd = fd1[0];
277 writefd = fd2[1];
278 fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK);
279#endif
280
281 // P(F("mtn %d: set up i/o channels")
282 // % GetCurrentProcessId());
283}
284
285// Non blocking read.
286
287Netxx::signed_size_type
288Netxx::PipeStream::read (void *buffer, size_type length)
289{
290#ifdef WIN32
291
292 if (length > bytes_available)
293 length = bytes_available;
294
295 if (length)
296 {
297 memcpy(buffer, readbuf, length);
298 if (length < bytes_available)
299 memmove(readbuf, readbuf+length, bytes_available-length);
300 bytes_available -= length;
301 }
302
303 return length;
304#else
305 return ::read(readfd, buffer, length);
306#endif
307}
308
309Netxx::signed_size_type
310Netxx::PipeStream::write(const void *buffer, size_type length)
311{
312#ifdef WIN32
313 DWORD written = 0;
314 BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL);
315 E(ok, F("WriteFile call failed: %s") % err_msg());
316#else
317 size_t written = ::write(writefd, buffer, length);
318#endif
319 return written;
320}
321
322void
323Netxx::PipeStream::close (void)
324{
325
326#ifdef WIN32
327 if (named_pipe != INVALID_HANDLE_VALUE)
328 CloseHandle(named_pipe);
329 named_pipe = INVALID_HANDLE_VALUE;
330
331 if (overlap.hEvent != INVALID_HANDLE_VALUE)
332 CloseHandle(overlap.hEvent);
333 overlap.hEvent = INVALID_HANDLE_VALUE;
334
335 if (child != INVALID_HANDLE_VALUE)
336 WaitForSingleObject(child, INFINITE);
337 child = INVALID_HANDLE_VALUE;
338#else
339 if (readfd != -1)
340 ::close(readfd);
341 readfd = -1;
342
343 if (writefd != -1)
344 ::close(writefd);
345 writefd = -1;
346
347 if (child)
348 while (waitpid(child,0,0) == -1 && errno == EINTR) ;
349 child = 0;
350#endif
351}
352
353Netxx::socket_type
354Netxx::PipeStream::get_socketfd (void) const
355{
356#ifdef WIN32
357 return (Netxx::socket_type) named_pipe;
358#else
359 return Netxx::socket_type(-1);
360#endif
361}
362
363const Netxx::ProbeInfo*
364Netxx::PipeStream::get_probe_info (void) const
365{
366 return 0;
367}
368
369#ifdef WIN32
370
371static string
372status_name(DWORD wstatus)
373{
374 switch (wstatus) {
375 case WAIT_TIMEOUT: return "WAIT_TIMEOUT";
376 case WAIT_OBJECT_0: return "WAIT_OBJECT_0";
377 case WAIT_FAILED: return "WAIT_FAILED";
378 case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1";
379 default: return "UNKNOWN";
380 }
381}
382
383Netxx::Probe::result_type
384Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
385{
386 if (!is_pipe)
387 return Probe::ready(timeout, rt);
388
389 // L(F("mtn %d: checking for i/o ready state") % GetCurrentProcessId());
390
391 if (rt == ready_none)
392 rt = ready_t; // remembered from add
393
394 if (rt & ready_write)
395 {
396 return make_pair(pipe->get_socketfd(), ready_write);
397 }
398
399 if (rt & ready_read)
400 {
401 if (pipe->bytes_available == 0)
402 {
403 // Issue an async request to fill our buffer.
404 BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf,
405 sizeof(pipe->readbuf), NULL, &pipe->overlap);
406 E(ok || GetLastError() == ERROR_IO_PENDING,
407 F("ReadFile call failed: %s") % err_msg());
408 pipe->read_in_progress = true;
409 }
410
411 if (pipe->read_in_progress)
412 {
413 I(pipe->bytes_available == 0);
414
415 // Attempt to wait for the completion of the read-in-progress.
416
417 int milliseconds = ((timeout.get_sec() * 1000)
418 + (timeout.get_usec() / 1000));
419
420 L(FL("WaitForSingleObject(,%d)") % milliseconds);
421
422 DWORD wstatus = WAIT_FAILED;
423
424 if (pipe->child != INVALID_HANDLE_VALUE)
425 {
426
427 // We're a server; we're going to wait for the client to
428 // exit as well as the pipe read status, because
429 // apparently you don't find out about closed pipes
430 // during an overlapped read request (?)
431
432 HANDLE handles[2];
433 handles[0] = pipe->overlap.hEvent;
434 handles[1] = pipe->child;
435
436 wstatus = WaitForMultipleObjects(2,
437 handles,
438 FALSE,
439 milliseconds);
440
441 E(wstatus != WAIT_FAILED,
442 F("WaitForMultipleObjects call failed: %s") % err_msg());
443
444 if (wstatus == WAIT_OBJECT_0 + 1)
445 return make_pair(pipe->get_socketfd(), ready_oobd);
446 }
447 else
448 {
449 wstatus = WaitForSingleObject(pipe->overlap.hEvent,
450 milliseconds);
451 E(wstatus != WAIT_FAILED,
452 F("WaitForSingleObject call failed: %s") % err_msg());
453 }
454
455 if (wstatus == WAIT_TIMEOUT)
456 return make_pair(-1, ready_none);
457
458 BOOL ok = GetOverlappedResult(pipe->named_pipe,
459 &pipe->overlap,
460 &pipe->bytes_available,
461 FALSE);
462
463 if (ok)
464 {
465 // We completed our read.
466 pipe->read_in_progress = false;
467 }
468 else
469 {
470 // We did not complete our read.
471 E(GetLastError() == ERROR_IO_INCOMPLETE,
472 F("GetOverlappedResult call failed: %s")
473 % err_msg());
474 }
475 }
476
477 if (pipe->bytes_available != 0)
478 {
479 return make_pair(pipe->get_socketfd(), ready_read);
480 }
481 }
482
483 return make_pair(pipe->get_socketfd(), ready_none);
484}
485
486void
487Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
488{
489 assert(!is_pipe);
490 assert(!pipe);
491 is_pipe = true;
492 pipe = &ps;
493 ready_t = rt;
494}
495
496void
497Netxx::PipeCompatibleProbe::add(StreamBase const &sb, ready_type rt)
498{
499 // FIXME: This is *still* an unfortunate way of performing a
500 // downcast, though slightly less awful than the old way, which
501 // involved throwing an exception.
502 //
503 // Perhaps we should twiddle the caller-visible API.
504
505 StreamBase const *sbp = &sb;
506 PipeStream const *psp = dynamic_cast<PipeStream const *>(sbp);
507 if (psp)
508 add(const_cast<PipeStream&>(*psp),rt);
509 else
510 {
511 assert(!is_pipe);
512 Probe::add(sb,rt);
513 }
514}
515
516void
517Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
518{
519 assert(!is_pipe);
520 Probe::add(ss,rt);
521}
522#else // unix
523void
524Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
525 {
526 if (rt == ready_none || rt & ready_read)
527 add_socket(ps.get_readfd(), ready_read);
528 if (rt == ready_none || rt & ready_write)
529 add_socket(ps.get_writefd(), ready_write);
530 }
531
532void
533Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
534{
535 try
536 {
537 add(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)),rt);
538 }
539 catch (...)
540 {
541 Probe::add(sb,rt);
542 }
543}
544
545void
546Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
547{
548 Probe::add(ss,rt);
549}
550#endif
551
552#ifdef BUILD_UNIT_TESTS
553#include "unit_tests.hh"
554
555#ifndef WIN32
556
557UNIT_TEST(pipe, simple_pipe)
558{ try
559 {
560 Netxx::PipeStream pipe("cat",vector<string>());
561
562 string result;
563 Netxx::PipeCompatibleProbe probe;
564 Netxx::Timeout timeout(2L), short_time(0,1000);
565
566 // time out because no data is available
567 probe.clear();
568 probe.add(pipe, Netxx::Probe::ready_read);
569 Netxx::Probe::result_type res = probe.ready(short_time);
570 I(res.second==Netxx::Probe::ready_none);
571
572 // write should be possible
573 probe.clear();
574 probe.add(pipe, Netxx::Probe::ready_write);
575 res = probe.ready(short_time);
576 I(res.second & Netxx::Probe::ready_write);
577#ifdef WIN32
578 I(res.first==pipe.get_socketfd());
579#else
580 I(res.first==pipe.get_writefd());
581#endif
582
583 // try binary transparency
584 for (int c = 0; c < 256; ++c)
585 {
586 char buf[1024];
587 buf[0] = c;
588 buf[1] = 255 - c;
589 pipe.write(buf, 2);
590
591 string result;
592 while (result.size() < 2)
593 { // wait for data to arrive
594 probe.clear();
595 probe.add(pipe, Netxx::Probe::ready_read);
596 res = probe.ready(timeout);
597 E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c);
598#ifdef WIN32
599 I(res.first == pipe.get_socketfd());
600#else
601 I(res.first == pipe.get_readfd());
602#endif
603 int bytes = pipe.read(buf, sizeof(buf));
604 result += string(buf, bytes);
605 }
606 I(result.size() == 2);
607 I(static_cast<unsigned char>(result[0]) == c);
608 I(static_cast<unsigned char>(result[1]) == 255 - c);
609 }
610
611 pipe.close();
612
613 }
614catch (informative_failure &e)
615 // for some reason boost does not provide
616 // enough information
617 {
618 W(F("Failure %s") % e.what());
619 throw;
620 }
621}
622#endif
623#endif
624
625// Local Variables:
626// mode: C++
627// fill-column: 76
628// c-file-style: "gnu"
629// indent-tabs-mode: nil
630// End:
631// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status