monotone

monotone Mtn Source Tree

Root/src/netxx_pipe.cc

1// Copyright (C) 2005 Christof Petig <christof@petig-baender.de>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include "netxx_pipe.hh"
12#include "sanity.hh"
13#include "platform.hh"
14#include "netxx/streamserver.h"
15#include <cstring> // strerror
16#include <cstdlib> // exit
17#include <cassert> // assert
18
19#ifdef WIN32
20#include <windows.h>
21#include <io.h>
22#include <fcntl.h>
23#else
24#include <unistd.h>
25#include <fcntl.h>
26#include <sys/wait.h>
27#include <errno.h>
28#endif
29
30using std::vector;
31using std::string;
32using std::make_pair;
33using std::exit;
34using std::perror;
35using std::strerror;
36
37Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
38 :
39#ifdef WIN32
40 child(INVALID_HANDLE_VALUE),
41 bytes_available(0),
42 read_in_progress(false)
43#else
44 readfd(_readfd),
45 writefd(_writefd),
46 child(0)
47#endif
48{
49#ifdef WIN32
50 E(0, origin::system, F("this transport not supported on native Win32; use Cygwin"));
51
52 // keeping code in case someone wants to try fixing it
53
54 if (_setmode(_readfd, _O_BINARY) == -1)
55 L(FL("failed to set input file descriptor to binary"));
56
57 if (_setmode(_writefd, _O_BINARY) == -1)
58 L(FL("failed to set output file descriptor to binary"));
59
60 named_pipe = (HANDLE)_get_osfhandle(_readfd);
61
62 E(named_pipe != INVALID_HANDLE_VALUE, origin::system,
63 F("pipe handle is invalid"));
64
65 // Create infrastructure for overlapping I/O
66 memset(&overlap, 0, sizeof(overlap));
67 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
68 bytes_available = 0;
69 I(overlap.hEvent != 0);
70#else
71 int flags = fcntl(readfd, F_GETFL, 0);
72 I(fcntl(readfd, F_SETFL, flags | O_NONBLOCK) != -1);
73 flags = fcntl(writefd, F_GETFL, 0);
74 I(fcntl(writefd, F_SETFL, flags | O_NONBLOCK) != -1);
75#endif
76}
77
78
79#ifndef WIN32
80
81// Create pipes for stdio and fork subprocess, returns -1 on error, 0
82// to child and PID to parent.
83
84static pid_t
85pipe_and_fork(int fd1[2], int fd2[2])
86{
87 pid_t result = -1;
88 fd1[0] = -1;
89 fd1[1] = -1;
90 fd2[0] = -1;
91 fd2[1] = -1;
92
93 if (pipe(fd1))
94 return -1;
95
96 if (pipe(fd2))
97 {
98 close(fd1[0]);
99 close(fd1[1]);
100 return -1;
101 }
102
103 result = fork();
104
105 if (result < 0)
106 {
107 close(fd1[0]);
108 close(fd1[1]);
109 close(fd2[0]);
110 close(fd2[1]);
111 return -1;
112 }
113
114 else if (!result)
115 {
116 // child process; replace our stdin, stdout with the child side of the
117 // pipes.
118 //
119 // fd1[1] for writing (stdout, file descriptor 1),
120 // fd2[0] for reading (stdin, file descriptor 0)
121 //
122 // Note that stderr is not affected; child writes to stderr go
123 // directly to parent stderr stream.
124 close(fd1[0]);
125 close(fd2[1]);
126 if (dup2(fd2[0], 0) != 0 ||
127 dup2(fd1[1], 1) != 1)
128 {
129 perror("dup2");
130 exit(-1); // kill the useless child
131 }
132 close(fd1[1]);
133 close(fd2[0]);
134 }
135
136 else
137 {
138 // fd1[0] for reading, fd2[1] for writing
139 close(fd1[1]);
140 close(fd2[0]);
141 }
142
143 return result;
144}
145#endif
146
147#ifdef WIN32
148static string
149err_msg()
150{
151 char buf[1024];
152 I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
153 NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
154 (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0);
155 return string(buf);
156}
157#endif
158
159
160Netxx::PipeStream::PipeStream (const string & cmd,
161 const vector<string> & args)
162 :
163#ifdef WIN32
164 child(INVALID_HANDLE_VALUE),
165 bytes_available(0),
166 read_in_progress(false)
167#else
168 readfd(-1),
169 writefd(-1),
170 child(0)
171#endif
172{
173 // Unfortunately neither munge_argv_into_cmdline nor execvp do take
174 // a vector<string> as argument.
175
176 const unsigned newsize = 64;
177 const char *newargv[newsize];
178 I(args.size() < (sizeof(newargv) / sizeof(newargv[0])));
179
180 unsigned newargc = 0;
181 newargv[newargc++]=cmd.c_str();
182 for (vector<string>::const_iterator i = args.begin();
183 i != args.end(); ++i)
184 newargv[newargc++] = i->c_str();
185 newargv[newargc] = 0;
186
187#ifdef WIN32
188
189 E(0, origin::system, F("this transport not supported on native Win32; use Cygwin"));
190
191 // keeping code in case someone wants to try fixing it
192
193 // In order to use nonblocking i/o on windows, you must use named
194 // pipes and overlapped i/o. There is no other way, alas.
195
196 static unsigned long serial = 0;
197 string pipename = (FL("\\\\.\\pipe\\netxx_pipe_%ld_%d")
198 % GetCurrentProcessId()
199 % (++serial)).str();
200
201 // Create the parent's handle to the named pipe.
202
203 named_pipe = CreateNamedPipe(pipename.c_str(),
204 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
205 PIPE_TYPE_BYTE | PIPE_WAIT,
206 1,
207 sizeof(readbuf),
208 sizeof(readbuf),
209 1000,
210 0);
211
212 E(named_pipe != INVALID_HANDLE_VALUE, origin::system,
213 F("CreateNamedPipe(%s,...) call failed: %s")
214 % pipename % err_msg());
215
216 // Open the child's handle to the named pipe.
217
218 SECURITY_ATTRIBUTES inherit;
219 memset(&inherit,0,sizeof inherit);
220 inherit.nLength=sizeof inherit;
221 inherit.bInheritHandle = TRUE;
222
223 HANDLE hpipe = CreateFile(pipename.c_str(),
224 GENERIC_READ|GENERIC_WRITE, 0,
225 &inherit,
226 OPEN_EXISTING,
227 FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
228
229 E(hpipe != INVALID_HANDLE_VALUE, origin::system,
230 F("CreateFile(%s,...) call failed: %s")
231 % pipename % err_msg());
232
233 // Set up the child with the pipes as stdin/stdout and inheriting stderr.
234
235 PROCESS_INFORMATION piProcInfo;
236 STARTUPINFO siStartInfo;
237
238 memset(&piProcInfo, 0, sizeof(piProcInfo));
239 memset(&siStartInfo, 0, sizeof(siStartInfo));
240
241 siStartInfo.cb = sizeof(siStartInfo);
242 siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
243 siStartInfo.hStdOutput = hpipe;
244 siStartInfo.hStdInput = hpipe;
245 siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
246
247 string cmdline = munge_argv_into_cmdline(newargv);
248 L(FL("Subprocess command line: '%s'") % cmdline);
249
250 BOOL started = CreateProcess(NULL, // Application name
251 const_cast<CHAR*>(cmdline.c_str()),
252 NULL, // Process attributes
253 NULL, // Thread attributes
254 TRUE, // Inherit handles
255 0, // Creation flags
256 NULL, // Environment
257 NULL, // Current directory
258 &siStartInfo,
259 &piProcInfo);
260 E(started, origin::system,
261 F("CreateProcess(%s,...) call failed: %s")
262 % cmdline % err_msg());
263
264 child = piProcInfo.hProcess;
265
266 // create infrastructure for overlapping I/O
267
268 memset(&overlap, 0, sizeof(overlap));
269 overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
270 bytes_available = 0;
271 I(overlap.hEvent != 0);
272
273#else // !WIN32
274
275 int fd1[2], fd2[2];
276 child = pipe_and_fork(fd1, fd2);
277 E(child >= 0, origin::system, F("pipe/fork failed: %s") % strerror(errno));
278 if (!child)
279 {
280 execvp(newargv[0], const_cast<char * const *>(newargv));
281 perror(newargv[0]);
282 exit(errno);
283 }
284 readfd = fd1[0];
285 writefd = fd2[1];
286 fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK);
287#endif
288
289 // P(F("mtn %d: set up i/o channels")
290 // % GetCurrentProcessId());
291}
292
293// Non blocking read.
294
295Netxx::signed_size_type
296Netxx::PipeStream::read (void *buffer, size_type length)
297{
298#ifdef WIN32
299
300 if (length > bytes_available)
301 length = bytes_available;
302
303 if (length)
304 {
305 memcpy(buffer, readbuf, length);
306 if (length < bytes_available)
307 memmove(readbuf, readbuf+length, bytes_available-length);
308 bytes_available -= length;
309 }
310
311 return length;
312#else
313 return ::read(readfd, buffer, length);
314#endif
315}
316
317Netxx::signed_size_type
318Netxx::PipeStream::write(const void *buffer, size_type length)
319{
320#ifdef WIN32
321 DWORD written = 0;
322 BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL);
323 E(ok, origin::system, F("WriteFile call failed: %s") % err_msg());
324#else
325 size_t written = ::write(writefd, buffer, length);
326#endif
327 return written;
328}
329
330void
331Netxx::PipeStream::close (void)
332{
333
334#ifdef WIN32
335 if (named_pipe != INVALID_HANDLE_VALUE)
336 CloseHandle(named_pipe);
337 named_pipe = INVALID_HANDLE_VALUE;
338
339 if (overlap.hEvent != INVALID_HANDLE_VALUE)
340 CloseHandle(overlap.hEvent);
341 overlap.hEvent = INVALID_HANDLE_VALUE;
342
343 if (child != INVALID_HANDLE_VALUE)
344 WaitForSingleObject(child, INFINITE);
345 child = INVALID_HANDLE_VALUE;
346#else
347 if (readfd != -1)
348 ::close(readfd);
349 readfd = -1;
350
351 if (writefd != -1)
352 ::close(writefd);
353 writefd = -1;
354
355 if (child)
356 while (waitpid(child,0,0) == -1 && errno == EINTR) ;
357 child = 0;
358#endif
359}
360
361Netxx::socket_type
362Netxx::PipeStream::get_socketfd (void) const
363{
364#ifdef WIN32
365 return (Netxx::socket_type) named_pipe;
366#else
367 return Netxx::socket_type(-1);
368#endif
369}
370
371const Netxx::ProbeInfo*
372Netxx::PipeStream::get_probe_info (void) const
373{
374 return 0;
375}
376
377#ifdef WIN32
378
379static string
380status_name(DWORD wstatus)
381{
382 switch (wstatus) {
383 case WAIT_TIMEOUT: return "WAIT_TIMEOUT";
384 case WAIT_OBJECT_0: return "WAIT_OBJECT_0";
385 case WAIT_FAILED: return "WAIT_FAILED";
386 case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1";
387 default: return "UNKNOWN";
388 }
389}
390
391Netxx::Probe::result_type
392Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
393{
394 if (!is_pipe)
395 return Probe::ready(timeout, rt);
396
397 // L(F("mtn %d: checking for i/o ready state") % GetCurrentProcessId());
398
399 if (rt == ready_none)
400 rt = ready_t; // remembered from add
401
402 if (rt & ready_write)
403 {
404 return make_pair(pipe->get_socketfd(), ready_write);
405 }
406
407 if (rt & ready_read)
408 {
409 if (pipe->bytes_available == 0)
410 {
411 // Issue an async request to fill our buffer.
412 BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf,
413 sizeof(pipe->readbuf), NULL, &pipe->overlap);
414 E(ok || GetLastError() == ERROR_IO_PENDING, origin::system,
415 F("ReadFile call failed: %s") % err_msg());
416 pipe->read_in_progress = true;
417 }
418
419 if (pipe->read_in_progress)
420 {
421 I(pipe->bytes_available == 0);
422
423 // Attempt to wait for the completion of the read-in-progress.
424
425 int milliseconds = ((timeout.get_sec() * 1000)
426 + (timeout.get_usec() / 1000));
427
428 L(FL("WaitForSingleObject(,%d)") % milliseconds);
429
430 DWORD wstatus = WAIT_FAILED;
431
432 if (pipe->child != INVALID_HANDLE_VALUE)
433 {
434
435 // We're a server; we're going to wait for the client to
436 // exit as well as the pipe read status, because
437 // apparently you don't find out about closed pipes
438 // during an overlapped read request (?)
439
440 HANDLE handles[2];
441 handles[0] = pipe->overlap.hEvent;
442 handles[1] = pipe->child;
443
444 wstatus = WaitForMultipleObjects(2,
445 handles,
446 FALSE,
447 milliseconds);
448
449 E(wstatus != WAIT_FAILED, origin::system,
450 F("WaitForMultipleObjects call failed: %s") % err_msg());
451
452 if (wstatus == WAIT_OBJECT_0 + 1)
453 return make_pair(pipe->get_socketfd(), ready_oobd);
454 }
455 else
456 {
457 wstatus = WaitForSingleObject(pipe->overlap.hEvent,
458 milliseconds);
459 E(wstatus != WAIT_FAILED, origin::system,
460 F("WaitForSingleObject call failed: %s") % err_msg());
461 }
462
463 if (wstatus == WAIT_TIMEOUT)
464 return make_pair(-1, ready_none);
465
466 BOOL ok = GetOverlappedResult(pipe->named_pipe,
467 &pipe->overlap,
468 &pipe->bytes_available,
469 FALSE);
470
471 if (ok)
472 {
473 // We completed our read.
474 pipe->read_in_progress = false;
475 }
476 else
477 {
478 // We did not complete our read.
479 E(GetLastError() == ERROR_IO_INCOMPLETE, origin::system,
480 F("GetOverlappedResult call failed: %s")
481 % err_msg());
482 }
483 }
484
485 if (pipe->bytes_available != 0)
486 {
487 return make_pair(pipe->get_socketfd(), ready_read);
488 }
489 }
490
491 return make_pair(pipe->get_socketfd(), ready_none);
492}
493
494void
495Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
496{
497 assert(!is_pipe);
498 assert(!pipe);
499 is_pipe = true;
500 pipe = &ps;
501 ready_t = rt;
502}
503
504void
505Netxx::PipeCompatibleProbe::add(StreamBase const &sb, ready_type rt)
506{
507 // FIXME: This is *still* an unfortunate way of performing a
508 // downcast, though slightly less awful than the old way, which
509 // involved throwing an exception.
510 //
511 // Perhaps we should twiddle the caller-visible API.
512
513 StreamBase const *sbp = &sb;
514 PipeStream const *psp = dynamic_cast<PipeStream const *>(sbp);
515 if (psp)
516 add(const_cast<PipeStream&>(*psp),rt);
517 else
518 {
519 assert(!is_pipe);
520 Probe::add(sb,rt);
521 }
522}
523
524void
525Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
526{
527 assert(!is_pipe);
528 Probe::add(ss,rt);
529}
530#else // unix
531void
532Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
533 {
534 if (rt == ready_none || rt & ready_read)
535 add_socket(ps.get_readfd(), ready_read);
536 if (rt == ready_none || rt & ready_write)
537 add_socket(ps.get_writefd(), ready_write);
538 }
539
540void
541Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
542{
543 try
544 {
545 add(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)),rt);
546 }
547 catch (...)
548 {
549 Probe::add(sb,rt);
550 }
551}
552
553void
554Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
555{
556 Probe::add(ss,rt);
557}
558#endif
559
560
561// Local Variables:
562// mode: C++
563// fill-column: 76
564// c-file-style: "gnu"
565// indent-tabs-mode: nil
566// End:
567// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status