monotone

monotone Mtn Source Tree

Root/network.cc

1// copyright (C) 2002, 2003 graydon hoare <graydon@pobox.com>
2// all rights reserved.
3// licensed to the public under the terms of the GNU GPL (>= 2)
4// see the file COPYING for details
5
6// this file contains basic utilities for dealing with the network
7// protocols monotone knows how to speak, and the queueing / posting system
8// which interacts with the network via those protocols.
9
10#include <sys/time.h>
11#include <netdb.h>
12#include <arpa/inet.h>
13
14#include "boost/socket/socket_errors.hpp"
15#include "boost/socket/any_protocol.hpp"
16#include "boost/socket/any_address.hpp"
17#include "boost/socket/ip4.hpp"
18#include "boost/socket/connector_socket.hpp"
19#include "boost/socket/socketstream.hpp"
20#include "boost/socket/socket_exception.hpp"
21
22#include "adns/adns.h"
23
24#include <boost/spirit.hpp>
25
26#include <ctype.h>
27
28#include <boost/shared_ptr.hpp>
29#include <string>
30#include <iostream>
31
32#include "app_state.hh"
33#include "constants.hh"
34#include "database.hh"
35#include "http_tasks.hh"
36#include "keys.hh"
37#include "lua.hh"
38#include "network.hh"
39#include "nntp_tasks.hh"
40#include "patch_set.hh"
41#include "sanity.hh"
42#include "smtp_tasks.hh"
43#include "transforms.hh"
44#include "url.hh"
45
46using namespace std;
47using namespace boost::spirit;
48using boost::lexical_cast;
49using boost::shared_ptr;
50
51
52struct monotone_socket_error_policy
53{
54
55 boost::socket::socket_errno
56 handle_error(boost::socket::function::name fn,
57 boost::socket::socket_errno error)
58 {
59 string func("unknown");
60
61 switch (fn)
62 {
63#define TRANSLATE_FUNC(xx) case boost::socket::function::xx: func = #xx; break;
64TRANSLATE_FUNC(ioctl);
65TRANSLATE_FUNC(getsockopt);
66TRANSLATE_FUNC(setsockopt);
67TRANSLATE_FUNC(open);
68TRANSLATE_FUNC(connect);
69TRANSLATE_FUNC(bind);
70TRANSLATE_FUNC(listen);
71TRANSLATE_FUNC(accept);
72TRANSLATE_FUNC(recv);
73TRANSLATE_FUNC(send);
74TRANSLATE_FUNC(shutdown);
75TRANSLATE_FUNC(close);
76#undef TRANSLATE_FUNC
77 }
78
79 switch (error)
80 {
81 case boost::socket::WouldBlock :
82L(F("temporary failure in %s: operation would block\n") % func);
83return error;
84
85 case boost::socket::interrupted_function_call :
86L(F("temporary failure in %s: interrupted syscall\n") % func);
87return error;
88
89 case boost::socket::address_already_in_use :
90throw informative_failure(func + ": Address already in use");
91break;
92
93 case boost::socket::address_family_not_supported_by_protocol_family :
94throw informative_failure(func + ": Address family not supported by protocol family");
95break;
96
97 case boost::socket::address_not_available :
98throw informative_failure(func + ": Address not available");
99break;
100
101 case boost::socket::bad_address :
102throw informative_failure(func + ": Bad address");
103break;
104
105 case boost::socket::bad_protocol_option :
106throw informative_failure(func + ": Bad protocol option");
107break;
108
109 case boost::socket::cannot_assign_requested_address :
110throw informative_failure(func + ": Cannot assign requested address");
111break;
112
113 case boost::socket::cannot_send_after_socket_shutdown:
114throw informative_failure(func + ": Can't send after socket shutdown");
115break;
116
117 case boost::socket::connection_aborted :
118throw informative_failure(func + ": Connection aborted");
119break;
120
121 case boost::socket::connection_already_in_progress :
122throw informative_failure(func + ": Connection already in progress");
123break;
124
125 case boost::socket::connection_refused :
126throw informative_failure(func + ": Connection refused");
127break;
128
129 case boost::socket::connection_reset_by_peer :
130throw informative_failure(func + ": Connection reset by peer");
131break;
132
133 case boost::socket::connection_timed_out :
134throw informative_failure(func + ": Connection timed out");
135break;
136
137 case boost::socket::destination_address_required :
138throw informative_failure(func + ": Destination address required");
139break;
140
141 case boost::socket::graceful_shutdown_in_progress :
142throw informative_failure(func + ": Graceful shutdown in progress");
143break;
144
145 case boost::socket::host_is_down :
146throw informative_failure(func + ": Host is down");
147break;
148
149 case boost::socket::host_is_unreachable :
150throw informative_failure(func + ": Host is unreachable");
151break;
152
153 case boost::socket::host_not_found :
154throw informative_failure(func + ": Host not found");
155break;
156
157 case boost::socket::insufficient_memory_available :
158throw informative_failure(func + ": Insufficient memory available");
159break;
160
161 case boost::socket::invalid_argument :
162throw informative_failure(func + ": Invalid argument");
163break;
164
165 case boost::socket::message_too_long :
166throw informative_failure(func + ": Message too long");
167break;
168
169 case boost::socket::net_reset :
170throw informative_failure(func + ": net reset");
171break;
172
173 case boost::socket::network_dropped_connection_on_reset :
174throw informative_failure(func + ": Network dropped connection on reset");
175break;
176
177 case boost::socket::network_interface_is_not_configured :
178throw informative_failure(func + ": Network interface is not configured");
179break;
180
181 case boost::socket::network_is_down :
182throw informative_failure(func + ": Network is down");
183break;
184
185 case boost::socket::network_is_unreachable :
186throw informative_failure(func + ": Network is unreachable");
187break;
188
189 case boost::socket::network_subsystem_is_unavailable :
190throw informative_failure(func + ": Network subsystem is unavailable");
191break;
192
193 case boost::socket::no_buffer_space_available :
194throw informative_failure(func + ": No buffer space available");
195break;
196
197 case boost::socket::no_route_to_host :
198throw informative_failure(func + ": No route to host");
199break;
200
201 case boost::socket::nonauthoritative_host_not_found :
202throw informative_failure(func + ": Nonauthoritative host not found");
203break;
204
205 case boost::socket::not_a_valid_descriptor :
206throw informative_failure(func + ": not a valid descriptor");
207break;
208
209 case boost::socket::one_or_more_parameters_are_invalid :
210throw informative_failure(func + ": One or more parameters are invalid");
211break;
212
213 case boost::socket::operation_already_in_progress :
214throw informative_failure(func + ": Operation already in progress");
215break;
216
217 case boost::socket::operation_not_supported :
218throw informative_failure(func + ": Operation not supported");
219break;
220
221 case boost::socket::operation_not_supported_on_transport_endpoint :
222throw informative_failure(func + ": Operation not supported on transport endpoint");
223break;
224
225 case boost::socket::operation_now_in_progress :
226throw informative_failure(func + ": Operation now in progress");
227break;
228
229 case boost::socket::overlapped_operation_aborted :
230throw informative_failure(func + ": Overlapped operation aborted");
231break;
232
233 case boost::socket::permission_denied:
234throw informative_failure(func + ": Permission denied");
235break;
236
237 case boost::socket::protocol_family_not_supported :
238throw informative_failure(func + ": Protocol family not supported");
239break;
240
241 case boost::socket::protocol_not_available :
242throw informative_failure(func + ": Protocol not available");
243break;
244
245 case boost::socket::protocol_wrong_type_for_socket :
246throw informative_failure(func + ": Protocol wrong type for socket");
247break;
248
249 case boost::socket::socket_is_already_connected :
250throw informative_failure(func + ": Socket is already connected");
251break;
252
253 case boost::socket::socket_is_not_connected :
254throw informative_failure(func + ": Socket is not connected");
255break;
256
257 case boost::socket::socket_operation_on_nonsocket :
258throw informative_failure(func + ": Socket operation on nonsocket");
259break;
260
261 case boost::socket::socket_type_not_supported :
262throw informative_failure(func + ": Socket type not supported");
263break;
264
265 case boost::socket::software_caused_connection_abort :
266throw informative_failure(func + ": Software caused connection abort");
267break;
268
269 case boost::socket::specified_event_object_handle_is_invalid :
270throw informative_failure(func + ": Specified event object handle is invalid");
271break;
272
273 case boost::socket::system_call_failure :
274throw informative_failure(func + ": System call failure");
275break;
276
277 case boost::socket::this_is_a_nonrecoverable_error :
278throw informative_failure(func + ": This is a nonrecoverable error");
279break;
280
281 case boost::socket::too_many_open_files :
282throw informative_failure(func + ": Too many open files");
283break;
284
285 case boost::socket::too_many_processes :
286throw informative_failure(func + ": Too many processes");
287break;
288
289 case boost::socket::unknown_protocol :
290throw informative_failure(func + ": Unknown protocol");
291break;
292
293 case boost::socket::system_specific_error :
294throw informative_failure(func + ": System specific error");
295break;
296
297 default:
298throw informative_failure(func + ": Unknown error");
299break;
300 }
301 return boost::socket::system_specific_error;
302 }
303};
304
305typedef
306boost::socket::socket_base<monotone_socket_error_policy,
307 boost::socket::impl::default_socket_impl>
308monotone_socket_base;
309
310typedef
311boost::socket::data_socket<monotone_socket_base>
312monotone_data_socket;
313
314typedef
315boost::socket::connector<monotone_socket_base>
316monotone_connector;
317
318typedef
319monotone_connector::data_connection_t
320monotone_connection;
321
322typedef
323boost::socket::basic_socket_stream<char,
324 std::char_traits<char>,
325 monotone_data_socket>
326monotone_socket_stream;
327
328bool lookup_address(string const & dns_name,
329 string & ip4)
330{
331 static map<string, string> name_cache;
332 adns_state st;
333 adns_answer *answer;
334
335 if (dns_name.size() == 0)
336 return false;
337
338 map<string, string>::const_iterator it = name_cache.find(dns_name);
339 if (it != name_cache.end())
340 {
341 ip4 = it->second;
342 return true;
343 }
344
345 L(F("resolving name %s\n") % dns_name);
346
347 if (isdigit(dns_name[0]))
348 {
349 L(F("%s considered a raw IP address, returning\n") % dns_name);
350 ip4 = dns_name;
351 return true;
352 }
353
354 I(adns_init(&st, adns_if_noerrprint, 0) == 0);
355 if (adns_synchronous(st, dns_name.c_str(), adns_r_a,
356 (adns_queryflags)0, &answer) != 0)
357 {
358 L(F("IP sync lookup returned false\n"));
359 adns_finish(st);
360 return false;
361 }
362
363 if (answer->status != adns_s_ok)
364 {
365 L(F("IP sync lookup returned status %d\n") % answer->status);
366 free(answer);
367 adns_finish(st);
368 return false;
369 }
370
371 ip4 = string(inet_ntoa(*(answer->rrs.inaddr)));
372 name_cache.insert(make_pair(dns_name, ip4));
373
374 L(F("name %s resolved to IP %s\n") % dns_name % ip4);
375
376 free(answer);
377 adns_finish(st);
378 return true;
379}
380
381bool lookup_mxs(string const & dns_name,
382set< pair<int, string> > & mx_names)
383{
384 typedef shared_ptr< set< pair<int, string> > > mx_set_ptr;
385 static map<string, mx_set_ptr> mx_cache;
386 adns_state st;
387 adns_answer *answer;
388
389 if (dns_name.size() == 0)
390 return false;
391
392 map<string, mx_set_ptr>::const_iterator it
393 = mx_cache.find(dns_name);
394 if (it != mx_cache.end())
395 {
396 mx_names = *(it->second);
397 return true;
398 }
399
400 mx_set_ptr mxs = mx_set_ptr(new set< pair<int, string> >());
401
402 L(F("searching for MX records for %s\n") % dns_name);
403
404 if (isdigit(dns_name[0]))
405 {
406 L(F("%s considered a raw IP address, returning\n") % dns_name);
407 mx_names.insert(make_pair(10, dns_name));
408 return true;
409 }
410
411 I(adns_init(&st, adns_if_noerrprint, 0) == 0);
412 if (adns_synchronous(st, dns_name.c_str(), adns_r_mx_raw,
413 (adns_queryflags)0, &answer) != 0)
414 {
415 L(F("MX sync lookup returned false\n"));
416 adns_finish(st);
417 return false;
418 }
419
420 if (answer->status != adns_s_ok)
421 {
422 L(F("MX sync lookup returned status %d\n") % answer->status);
423 free(answer);
424 adns_finish(st);
425 return false;
426 }
427
428 L(F("MX sync lookup returned %d results\n") % answer->nrrs);
429 for (int i = 0; i < answer->nrrs; ++i)
430 {
431 string mx = string(answer->rrs.intstr[i].str);
432 int prio = answer->rrs.intstr[i].i;
433 mxs->insert(make_pair(prio, mx));
434 mx_names.insert(make_pair(prio, mx));
435 L(F("MX %s : %s priority %d\n") % dns_name % mx % prio);
436 }
437 mx_cache.insert(make_pair(dns_name, mxs));
438
439 free(answer);
440 adns_finish(st);
441 return true;
442}
443
444
445void open_connection(string const & proto_name,
446 string const & host_name_in,
447 unsigned long port_num_in,
448 monotone_connection & connection,
449 boost::shared_ptr<iostream> & stream,
450 app_state & app)
451{
452 using namespace boost::socket;
453 string resolved_host;
454
455 // check for tunnels
456 string host_name = host_name_in;
457 unsigned long port_num = port_num_in;
458 if (app.lua.hook_get_connect_addr(proto_name,
459 host_name_in,
460 port_num_in,
461 host_name,
462 port_num))
463 {
464 P(F("directing connection to %s:%d\n") % host_name % port_num);
465 }
466 else
467 {
468 host_name = host_name_in;
469 port_num = port_num_in;
470 }
471
472
473 ip4::address addr;
474 ip4::tcp_protocol proto;
475
476 {
477 // fixme: boost::socket is currently a little braindead, in that it
478 // only takes ascii strings representing IP4 ADDRESSES. duh.
479 if (! lookup_address(host_name, resolved_host))
480 throw oops ("host " + host_name + " not found");
481 L(F("resolved '%s' as '%s'\n") % host_name % resolved_host);
482 }
483
484 addr.ip(resolved_host.c_str());
485 addr.port(port_num);
486
487 L(F("connecting to port number %d\n") % port_num);
488
489 monotone_connector connector;
490 N(connector.connect(connection, proto, addr) == 0,
491 F("unable to connect to server %s:%d") % host_name % port_num);
492
493 boost::shared_ptr< monotone_socket_stream >
494 link(new monotone_socket_stream(connection));
495
496 N(link->good(),
497 F("bad network link, connecting to %s:%d") % host_name % port_num);
498
499 stream = link;
500}
501
502
503static void post_http_blob(url const & targ,
504 string const & blob,
505 string const & group,
506 string const & host,
507 unsigned long port,
508 string const & path,
509 app_state & app,
510 bool & posted_ok)
511{
512 rsa_keypair_id keyid;
513 base64< arc4<rsa_priv_key> > privkey;
514 base64<rsa_sha1_signature> signature_base64;
515 rsa_sha1_signature signature_plain;
516 hexenc<rsa_sha1_signature> signature_hex;
517
518 N(app.lua.hook_get_http_auth(targ, keyid),
519 F("missing pubkey for '%s'") % targ);
520
521 N(app.db.private_key_exists(keyid),
522 F("missing private key data for '%s'") % keyid);
523
524 app.db.get_key(keyid, privkey);
525 make_signature(app.lua, keyid, privkey, blob, signature_base64);
526 decode_base64 (signature_base64, signature_plain);
527 encode_hexenc (signature_plain, signature_hex);
528
529 try
530 {
531 monotone_connection connection;
532 boost::shared_ptr<iostream> stream;
533
534 bool is_proxy = false;
535 string connect_host_name = host;
536 unsigned long connect_port_num = port;
537 if (app.lua.hook_get_http_proxy(host, port,
538 connect_host_name,
539 connect_port_num))
540{
541 P(F("using proxy at %s:%d\n") % connect_host_name % connect_port_num);
542 is_proxy = true;
543}
544 else
545{
546 connect_host_name = host;
547 connect_port_num = port;
548}
549
550 open_connection("http", connect_host_name, connect_port_num,
551 connection, stream, app);
552
553 posted_ok = post_http_packets(group, keyid(),
554 signature_hex(), blob, host,
555 path, port, is_proxy, *stream);
556 }
557 catch (std::exception & e)
558 {
559 L(F("got exception from network: %s\n") % string(e.what()));
560 }
561}
562
563static void post_nntp_blob(url const & targ,
564 string const & blob,
565 string const & group,
566 string const & host,
567 unsigned long port,
568 app_state & app,
569 bool & posted_ok)
570{
571 string sender;
572 N(app.lua.hook_get_news_sender(targ, sender),
573 F("missing sender address for '%s'") % targ);
574
575 try
576 {
577 monotone_connection connection;
578 boost::shared_ptr<iostream> stream;
579 open_connection("nntp", host, port, connection, stream, app);
580 posted_ok = post_nntp_article(group, sender,
581 // FIXME: maybe some sort of more creative subject line?
582 "[MT] packets",
583 blob, *stream);
584 }
585 catch (std::exception & e)
586 {
587 L(F("got exception from network: %s\n") % string(e.what()));
588 }
589}
590
591static void post_smtp_blob(url const & targ,
592 string const & blob,
593 string const & user,
594 string const & host,
595 unsigned long port,
596 app_state & app,
597 bool & posted_ok)
598{
599 string sender, self_hostname;
600
601 N(app.lua.hook_get_mail_sender(targ, sender),
602 F("missing sender address for '%s'") % targ);
603
604 N(app.lua.hook_get_mail_hostname(targ, self_hostname),
605 F("missing self hostname for '%s'") % targ);
606
607 N(user != "",
608 F("empty recipient in mailto: URL %s") % targ);
609
610 try
611 {
612 set< pair<int,string> > mxs;
613 lookup_mxs (host, mxs);
614 if (mxs.empty())
615{
616 L(F("MX lookup is empty, using hostname %s\n") % host);
617 mxs.insert(make_pair(10, host));
618}
619
620 bool found_working_mx = false;
621 monotone_connection connection;
622 boost::shared_ptr<iostream> stream;
623 for (set< pair<int, string> >::const_iterator mx = mxs.begin();
624 mx != mxs.end(); ++mx)
625{
626 try
627 {
628 open_connection("smtp", mx->second, port, connection,
629 stream, app);
630 found_working_mx = true;
631 break;
632 }
633 catch (...)
634 {
635 L(F("exception while contacting MX %s\n") % mx->second);
636 }
637}
638
639 // FIXME: maybe hook to modify envelope params?
640 if (found_working_mx)
641posted_ok = post_smtp_article(self_hostname,
642 sender, user + "@" + host,
643 sender, user + "@" + host,
644 "[MT] packets",
645 blob, *stream);
646 }
647 catch (std::exception & e)
648 {
649 L(F("got exception from network: %s\n") % string(e.what()));
650 }
651}
652
653
654void post_queued_blobs_to_network(set<url> const & targets,
655 app_state & app)
656{
657
658 L(F("found %d targets for posting\n") % targets.size());
659 bool exception_during_posts = false;
660
661 ticker n_bytes("bytes");
662 ticker n_packets("packets");
663
664 for (set<url>::const_iterator targ = targets.begin();
665 targ != targets.end(); ++targ)
666 {
667 try
668{
669 ace user, host, group;
670 urlenc path;
671 string proto;
672 unsigned long port;
673 N(parse_url(*targ, proto, user, host, path, group, port),
674 F("cannot parse url '%s'") % *targ);
675
676 N((proto == "http" || proto == "nntp" || proto == "mailto"),
677 F("unknown protocol '%s', only know nntp, http and mailto") % proto);
678
679 size_t queue_count = 0;
680 app.db.get_queue_count(*targ, queue_count);
681
682 while (queue_count != 0)
683 {
684 L(F("found %d packets for %s\n") % queue_count % *targ);
685 string postbody;
686 vector<string> packets;
687 while (postbody.size() < constants::postsz
688 && packets.size() < queue_count)
689{
690 string tmp;
691 app.db.get_queued_content(*targ, packets.size(), tmp);
692 packets.push_back(tmp);
693 postbody.append(tmp);
694}
695
696 if (postbody != "")
697{
698 bool posted_ok = false;
699
700 L(F("posting %d packets for %s\n") % packets.size() % *targ);
701
702 if (proto == "http")
703 post_http_blob(*targ, postbody, group(), host(), port, path(), app, posted_ok);
704 else if (proto == "nntp")
705 post_nntp_blob(*targ, postbody, group(), host(), port, app, posted_ok);
706 else if (proto == "mailto")
707 post_smtp_blob(*targ, postbody, user(), host(), port, app, posted_ok);
708
709 if (!posted_ok)
710 throw informative_failure("unknown failure during post to " + (*targ)());
711
712 n_packets += packets.size();
713 n_bytes += postbody.size();
714 for (size_t i = 0; i < packets.size(); ++i)
715 app.db.delete_posting(*targ, 0);
716}
717
718 app.db.get_queue_count(*targ, queue_count);
719 }
720}
721 catch (informative_failure & i)
722{
723 W(F("%s\n") % i.what);
724 exception_during_posts = true;
725}
726 }
727 if (exception_during_posts)
728 W(F("errors occurred during posts\n"));
729}
730
731void fetch_queued_blobs_from_network(set<url> const & sources,
732 app_state & app)
733{
734
735 bool exception_during_fetches = false;
736 packet_db_writer dbw(app);
737
738 for(set<url>::const_iterator src = sources.begin();
739 src != sources.end(); ++src)
740 {
741 try
742{
743 ace user, host, group;
744 urlenc path;
745 string proto;
746 unsigned long port;
747 N(parse_url(*src, proto, user, host, path, group, port),
748 F("cannot parse url '%s'") % *src);
749
750 N((proto == "http" || proto == "nntp" || proto == "mailto"),
751 F("unknown protocol '%s', only know nntp, http and mailto") % proto);
752
753 if (proto == "mailto")
754 {
755 P(F("cannot fetch from mailto url %s, skipping\n") % *src);
756 continue;
757 }
758
759 P(F("fetching packets from group %s\n") % *src);
760
761 dbw.server.reset(*src);
762 transaction_guard guard(app.db);
763 if (proto == "http")
764 {
765 unsigned long maj, min;
766 app.db.get_sequences(*src, maj, min);
767 monotone_connection connection;
768 boost::shared_ptr<iostream> stream;
769
770 bool is_proxy = false;
771 string connect_host_name = host();
772 unsigned long connect_port_num = port;
773 if (app.lua.hook_get_http_proxy(host(), port,
774 connect_host_name,
775 connect_port_num))
776{
777 P(F("using proxy at %s:%d\n") % connect_host_name % connect_port_num);
778 is_proxy = true;
779}
780 else
781{
782 connect_host_name = host();
783 connect_port_num = port;
784}
785
786 open_connection("http", connect_host_name, connect_port_num,
787 connection, stream, app);
788 fetch_http_packets(group(), maj, min, dbw, host(), path(), port,
789 is_proxy, *stream);
790 app.db.put_sequences(*src, maj, min);
791 }
792
793 else if (proto == "nntp")
794 {
795 unsigned long maj, min;
796 app.db.get_sequences(*src, maj, min);
797 monotone_connection connection;
798 boost::shared_ptr<iostream> stream;
799 open_connection("nntp", host(), port, connection, stream, app);
800 fetch_nntp_articles(group(), min, dbw, *stream);
801 app.db.put_sequences(*src, maj, min);
802 }
803 guard.commit();
804}
805 catch (informative_failure & i)
806{
807 W(F("%s\n") % i.what);
808 exception_during_fetches = true;
809}
810 }
811 P(F("fetched %d packets\n") % dbw.count);
812 if (exception_during_fetches)
813 W(F("errors occurred during fetches\n"));
814}
815

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status