monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <map>
12#include <cstdlib>
13#include <memory>
14#include <list>
15#include <deque>
16#include <stack>
17
18#include <time.h>
19
20#include "lexical_cast.hh"
21#include <boost/scoped_ptr.hpp>
22#include <boost/shared_ptr.hpp>
23#include <boost/bind.hpp>
24
25#include "app_state.hh"
26#include "cert.hh"
27#include "constants.hh"
28#include "enumerator.hh"
29#include "keys.hh"
30#include "lua.hh"
31#include "merkle_tree.hh"
32#include "netcmd.hh"
33#include "netio.hh"
34#include "numeric_vocab.hh"
35#include "refiner.hh"
36#include "revision.hh"
37#include "sanity.hh"
38#include "transforms.hh"
39#include "ui.hh"
40#include "xdelta.hh"
41#include "epoch.hh"
42#include "platform.hh"
43#include "hmac.hh"
44#include "globish.hh"
45#include "uri.hh"
46
47#include "botan/botan.h"
48
49#include "netxx/address.h"
50#include "netxx/peer.h"
51#include "netxx/probe.h"
52#include "netxx/socket.h"
53#include "netxx/sockopt.h"
54#include "netxx/stream.h"
55#include "netxx/streamserver.h"
56#include "netxx/timeout.h"
57#include "netxx_pipe.hh"
58// TODO: things to do that will break protocol compatibility
59// -- need some way to upgrade anonymous to keyed pull, without user having
60// to explicitly specify which they want
61// just having a way to respond "access denied, try again" might work
62// but perhaps better to have the anonymous command include a note "I
63// _could_ use key <...> if you prefer", and if that would lead to more
64// access, could reply "I do prefer". (Does this lead to too much
65// information exposure? Allows anonymous people to probe what branches
66// a key has access to.)
67// -- "warning" packet type?
68// -- Richard Levitte wants, when you (e.g.) request '*' but don't have
69// access to all of it, you just get the parts you have access to
70// (maybe with warnings about skipped branches). to do this right,
71// should have a way for the server to send back to the client "right,
72// you're not getting the following branches: ...", so the client will
73// not include them in its merkle trie.
74// -- add some sort of vhost field to the client's first packet, saying who
75// they expect to talk to
76
77//
78// This is the "new" network synchronization (netsync) system in
79// monotone. It is based on synchronizing pairs of merkle trees over an
80// interactive connection.
81//
82// A netsync process between peers treats each peer as either a source, a
83// sink, or both. When a peer is only a source, it will not write any new
84// items to its database. when a peer is only a sink, it will not send any
85// items from its database. When a peer is both a source and sink, it may
86// send and write items freely.
87//
88// The post-state of a netsync is that each sink contains a superset of the
89// items in its corresponding source; when peers are behaving as both
90// source and sink, this means that the post-state of the sync is for the
91// peers to have identical item sets.
92//
93//
94// Data structure
95// --------------
96//
97// Each node in a merkle tree contains a fixed number of slots. this number
98// is derived from a global parameter of the protocol -- the tree fanout --
99// such that the number of slots is 2^fanout. For now we will assume that
100// fanout is 4 thus there are 16 slots in a node, because this makes
101// illustration easier. The other parameter of the protocol is the size of
102// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
103//
104// Each slot in a merkle tree node is in one of 3 states:
105//
106// - empty
107// - leaf
108// - subtree
109//
110// In addition, each leaf contains a hash code which identifies an element
111// of the set being synchronized. Each subtree slot contains a hash code of
112// the node immediately beneath it in the merkle tree. Empty slots contain
113// no hash codes.
114//
115// Since empty slots have no hash code, they are represented implicitly by
116// a bitmap at the head of each merkle tree node. As an additional
117// integrity check, each merkle tree node contains a label indicating its
118// prefix in the tree, and a hash of its own contents.
119//
120// In total, then, the byte-level representation of a <160,4> merkle tree
121// node is as follows:
122//
123// 20 bytes - hash of the remaining bytes in the node
124// 1 byte - type of this node (manifest, file, key, mcert, fcert)
125// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
126// 0-20 bytes - the prefix of this node, 4 bits * level,
127// rounded up to a byte
128// 1-N bytes - number of leaves under this node (uleb128)
129// 4 bytes - slot-state bitmap of the node
130// 0-320 bytes - between 0 and 16 live slots in the node
131//
132// So, in the worst case such a node is 367 bytes, with these parameters.
133//
134//
135// Protocol
136// --------
137//
138// The protocol is a binary command-packet system over TCP; each packet
139// consists of a single byte which identifies the protocol version, a byte
140// which identifies the command name inside that version, a size_t sent as
141// a uleb128 indicating the length of the packet, that many bytes of
142// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
143// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
144// 20-byte random key chosen by the client after authentication (discussed
145// below). Decoding involves simply buffering until a sufficient number of
146// bytes are received, then advancing the buffer pointer. Any time an
147// integrity check (the HMAC) fails, the protocol is assumed to have lost
148// synchronization, and the connection is dropped. The parties are free to
149// drop the TCP stream at any point, if too much data is received or too
150// much idle time passes; no commitments or transactions are made.
151//
152//
153// Authentication and setup
154// ------------------------
155//
156// The exchange begins in a non-authenticated state. The server sends a
157// "hello <id> <nonce>" command, which identifies the server's RSA key and
158// issues a nonce which must be used for a subsequent authentication.
159//
160// The client then responds with either:
161//
162// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
163// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
164// role it wishes to play in the synchronization, identifies the pattern it
165// wishes to sync with, signs the previous nonce with its own key, and informs
166// the server of the HMAC key it wishes to use for this session (encrypted
167// with the server's public key); or
168//
169// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
170// <hmac key>" command, which identifies the role it wishes to play in the
171// synchronization, the pattern it wishes to sync with, and the HMAC key it
172// wishes to use for this session (also encrypted with the server's public
173// key).
174//
175// The server then replies with a "confirm" command, which contains no
176// other data but will only have the correct HMAC integrity code if the
177// server received and properly decrypted the HMAC key offered by the
178// client. This transitions the peers into an authenticated state and
179// begins epoch refinement. If epoch refinement and epoch transmission
180// succeed, the peers switch to data refinement and data transmission.
181//
182//
183// Refinement
184// ----------
185//
186// Refinement is executed by "refiners"; there is a refiner for each
187// set of 'items' being exchanged: epochs, keys, certs, and revisions.
188// When refinement starts, each party knows only their own set of
189// items; when refinement completes, each party has learned of the
190// complete set of items it needs to send, and a count of items it's
191// expecting to receive.
192//
193// For more details on the refinement process, see refiner.cc.
194//
195//
196// Transmission
197// ------------
198//
199// Once the set of items to send has been determined (for keys, certs, and
200// revisions) each peer switches into a transmission mode. This mode
201// involves walking the revision graph in ancestry-order and sending all
202// the items the local peer has which the remote one does not. Since the
203// remote and local peers both know all the items which need to be
204// transferred (they learned during refinement) they know what to wait for
205// and what to send. The mechanisms of the transmission phase (notably,
206// enumerator.cc) simply ensure that things are sent in the proper order,
207// and without over-filling the output buffer too much.
208//
209//
210// Shutdown
211// --------
212//
213// After transmission completes, one special command, "bye", is used to
214// shut down a connection gracefully. The shutdown sequence based on "bye"
215// commands is documented below in session::process_bye_cmd.
216//
217//
218// Note on epochs
219// --------------
220//
221// One refinement and transmission phase preceeds all the others: epochs.
222// Epochs are exchanged and compared in order to be sure that further
223// refinement and transmission (on certs and revisions) makes sense; they
224// are a sort of "immune system" to prevent incompatible databases (say
225// between rebuilds due to bugs in monotone) from cross-contaminating. The
226// later refinements are only kicked off *after* all epochs are received
227// and compare correctly.
228//
229//
230// Note on dense coding
231// --------------------
232//
233// This protocol is "raw binary" (non-text) because coding density is
234// actually important here, and each packet consists of very
235// information-dense material that you wouldn't have a hope of typing in,
236// or interpreting manually anyways.
237//
238
239using std::auto_ptr;
240using std::deque;
241using std::make_pair;
242using std::map;
243using std::min;
244using std::pair;
245using std::set;
246using std::string;
247using std::vector;
248
249using boost::shared_ptr;
250using boost::lexical_cast;
251
252struct server_initiated_sync_request
253{
254 string what;
255 string address;
256 string include;
257 string exclude;
258};
259deque<server_initiated_sync_request> server_initiated_sync_requests;
260LUAEXT(server_request_sync, )
261{
262 char const * w = luaL_checkstring(L, 1);
263 char const * a = luaL_checkstring(L, 2);
264 char const * i = luaL_checkstring(L, 3);
265 char const * e = luaL_checkstring(L, 4);
266 server_initiated_sync_request request;
267 request.what = string(w);
268 request.address = string(a);
269 request.include = string(i);
270 request.exclude = string(e);
271 server_initiated_sync_requests.push_back(request);
272 return 0;
273}
274
275static inline void
276require(bool check, string const & context)
277{
278 if (!check)
279 throw bad_decode(F("check of '%s' failed") % context);
280}
281
282struct netsync_error
283{
284 string msg;
285 netsync_error(string const & s): msg(s) {}
286};
287
288struct
289session:
290 public refiner_callbacks,
291 public enumerator_callbacks
292{
293 protocol_role role;
294 protocol_voice const voice;
295 globish our_include_pattern;
296 globish our_exclude_pattern;
297 globish_matcher our_matcher;
298 app_state & app;
299
300 string peer_id;
301 shared_ptr<Netxx::StreamBase> str;
302
303 string_queue inbuf;
304 // deque of pair<string data, size_t cur_pos>
305 deque< pair<string,size_t> > outbuf;
306 // the total data stored in outbuf - this is
307 // used as a valve to stop too much data
308 // backing up
309 size_t outbuf_size;
310
311 netcmd cmd;
312 bool armed;
313 bool arm();
314
315 id remote_peer_key_hash;
316 rsa_keypair_id remote_peer_key_name;
317 netsync_session_key session_key;
318 chained_hmac read_hmac;
319 chained_hmac write_hmac;
320 bool authenticated;
321
322 time_t last_io_time;
323 auto_ptr<ticker> byte_in_ticker;
324 auto_ptr<ticker> byte_out_ticker;
325 auto_ptr<ticker> cert_in_ticker;
326 auto_ptr<ticker> cert_out_ticker;
327 auto_ptr<ticker> revision_in_ticker;
328 auto_ptr<ticker> revision_out_ticker;
329 size_t bytes_in, bytes_out;
330 size_t certs_in, certs_out;
331 size_t revs_in, revs_out;
332 size_t keys_in, keys_out;
333 // used to identify this session to the netsync hooks.
334 // We can't just use saved_nonce, because that's blank for all
335 // anonymous connections and could lead to confusion.
336 size_t session_id;
337 static size_t session_count;
338
339 vector<revision_id> written_revisions;
340 vector<rsa_keypair_id> written_keys;
341 vector<cert> written_certs;
342
343 id saved_nonce;
344
345 enum
346 {
347 working_state,
348 shutdown_state,
349 confirmed_state
350 }
351 protocol_state;
352
353 bool encountered_error;
354
355 static const int no_error = 200;
356 static const int partial_transfer = 211;
357 static const int no_transfer = 212;
358
359 static const int not_permitted = 412;
360 static const int unknown_key = 422;
361 static const int mixing_versions = 432;
362
363 static const int role_mismatch = 512;
364 static const int bad_command = 521;
365
366 static const int failed_identification = 532;
367 //static const int bad_data = 541;
368
369 int error_code;
370
371 bool set_totals;
372
373 // Interface to refinement.
374 refiner epoch_refiner;
375 refiner key_refiner;
376 refiner cert_refiner;
377 refiner rev_refiner;
378
379 // Interface to ancestry grovelling.
380 revision_enumerator rev_enumerator;
381
382 // Enumerator_callbacks methods.
383 set<file_id> file_items_sent;
384 bool process_this_rev(revision_id const & rev);
385 bool queue_this_cert(hexenc<id> const & c);
386 bool queue_this_file(hexenc<id> const & f);
387 void note_file_data(file_id const & f);
388 void note_file_delta(file_id const & src, file_id const & dst);
389 void note_rev(revision_id const & rev);
390 void note_cert(hexenc<id> const & c);
391
392 session(protocol_role role,
393 protocol_voice voice,
394 globish const & our_include_pattern,
395 globish const & our_exclude_pattern,
396 app_state & app,
397 string const & peer,
398 shared_ptr<Netxx::StreamBase> sock,
399 bool initiated_by_server = false);
400
401 virtual ~session();
402
403 id mk_nonce();
404 void mark_recent_io();
405
406 void set_session_key(string const & key);
407 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
408
409 void setup_client_tickers();
410 bool done_all_refinements();
411 bool queued_all_items();
412 bool received_all_items();
413 bool finished_working();
414 void maybe_step();
415 void maybe_say_goodbye(transaction_guard & guard);
416
417 void note_item_arrived(netcmd_item_type ty, id const & i);
418 void maybe_note_epochs_finished();
419 void note_item_sent(netcmd_item_type ty, id const & i);
420
421 Netxx::Probe::ready_type which_events() const;
422 bool read_some();
423 bool write_some();
424
425 void error(int errcode, string const & errmsg);
426
427 void write_netcmd_and_try_flush(netcmd const & cmd);
428
429 // Outgoing queue-writers.
430 void queue_bye_cmd(u8 phase);
431 void queue_error_cmd(string const & errmsg);
432 void queue_done_cmd(netcmd_item_type type, size_t n_items);
433 void queue_hello_cmd(rsa_keypair_id const & key_name,
434 base64<rsa_pub_key> const & pub_encoded,
435 id const & nonce);
436 void queue_anonymous_cmd(protocol_role role,
437 globish const & include_pattern,
438 globish const & exclude_pattern,
439 id const & nonce2,
440 base64<rsa_pub_key> server_key_encoded);
441 void queue_auth_cmd(protocol_role role,
442 globish const & include_pattern,
443 globish const & exclude_pattern,
444 id const & client,
445 id const & nonce1,
446 id const & nonce2,
447 string const & signature,
448 base64<rsa_pub_key> server_key_encoded);
449 void queue_confirm_cmd();
450 void queue_refine_cmd(refinement_type ty, merkle_node const & node);
451 void queue_data_cmd(netcmd_item_type type,
452 id const & item,
453 string const & dat);
454 void queue_delta_cmd(netcmd_item_type type,
455 id const & base,
456 id const & ident,
457 delta const & del);
458
459 // Incoming dispatch-called methods.
460 bool process_error_cmd(string const & errmsg);
461 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
462 rsa_pub_key const & server_key,
463 id const & nonce);
464 bool process_bye_cmd(u8 phase, transaction_guard & guard);
465 bool process_anonymous_cmd(protocol_role role,
466 globish const & their_include_pattern,
467 globish const & their_exclude_pattern);
468 bool process_auth_cmd(protocol_role role,
469 globish const & their_include_pattern,
470 globish const & their_exclude_pattern,
471 id const & client,
472 id const & nonce1,
473 string const & signature);
474 bool process_refine_cmd(refinement_type ty, merkle_node const & node);
475 bool process_done_cmd(netcmd_item_type type, size_t n_items);
476 bool process_data_cmd(netcmd_item_type type,
477 id const & item,
478 string const & dat);
479 bool process_delta_cmd(netcmd_item_type type,
480 id const & base,
481 id const & ident,
482 delta const & del);
483 bool process_usher_cmd(utf8 const & msg);
484
485 // The incoming dispatcher.
486 bool dispatch_payload(netcmd const & cmd,
487 transaction_guard & guard);
488
489 // Various helpers.
490 void assume_corresponding_role(protocol_role their_role);
491 void respond_to_confirm_cmd();
492 bool data_exists(netcmd_item_type type,
493 id const & item);
494 void load_data(netcmd_item_type type,
495 id const & item,
496 string & out);
497
498 void rebuild_merkle_trees(app_state & app,
499 set<branch_name> const & branches);
500
501 void send_all_data(netcmd_item_type ty, set<id> const & items);
502 void begin_service();
503 bool process(transaction_guard & guard);
504
505 bool initiated_by_server;
506};
507size_t session::session_count = 0;
508
509session::session(protocol_role role,
510 protocol_voice voice,
511 globish const & our_include_pattern,
512 globish const & our_exclude_pattern,
513 app_state & app,
514 string const & peer,
515 shared_ptr<Netxx::StreamBase> sock,
516 bool initiated_by_server) :
517 role(role),
518 voice(voice),
519 our_include_pattern(our_include_pattern),
520 our_exclude_pattern(our_exclude_pattern),
521 our_matcher(our_include_pattern, our_exclude_pattern),
522 app(app),
523 peer_id(peer),
524 str(sock),
525 inbuf(),
526 outbuf_size(0),
527 armed(false),
528 remote_peer_key_hash(""),
529 remote_peer_key_name(""),
530 session_key(constants::netsync_key_initializer),
531 read_hmac(netsync_session_key(constants::netsync_key_initializer),
532 app.opts.use_transport_auth),
533 write_hmac(netsync_session_key(constants::netsync_key_initializer),
534 app.opts.use_transport_auth),
535 authenticated(false),
536 last_io_time(::time(NULL)),
537 byte_in_ticker(NULL),
538 byte_out_ticker(NULL),
539 cert_in_ticker(NULL),
540 cert_out_ticker(NULL),
541 revision_in_ticker(NULL),
542 revision_out_ticker(NULL),
543 bytes_in(0), bytes_out(0),
544 certs_in(0), certs_out(0),
545 revs_in(0), revs_out(0),
546 keys_in(0), keys_out(0),
547 session_id(++session_count),
548 saved_nonce(""),
549 protocol_state(working_state),
550 encountered_error(false),
551 error_code(no_transfer),
552 set_totals(false),
553 epoch_refiner(epoch_item, voice, *this),
554 key_refiner(key_item, voice, *this),
555 cert_refiner(cert_item, voice, *this),
556 rev_refiner(revision_item, voice, *this),
557 rev_enumerator(*this, app),
558 initiated_by_server(initiated_by_server)
559{}
560
561session::~session()
562{
563 if (protocol_state == confirmed_state)
564 error_code = no_error;
565 else if (error_code == no_transfer &&
566 (revs_in || revs_out ||
567 certs_in || certs_out ||
568 keys_in || keys_out))
569 error_code = partial_transfer;
570
571 vector<cert> unattached_certs;
572 map<revision_id, vector<cert> > revcerts;
573 for (vector<revision_id>::iterator i = written_revisions.begin();
574 i != written_revisions.end(); ++i)
575 revcerts.insert(make_pair(*i, vector<cert>()));
576 for (vector<cert>::iterator i = written_certs.begin();
577 i != written_certs.end(); ++i)
578 {
579 map<revision_id, vector<cert> >::iterator j;
580 j = revcerts.find(revision_id(i->ident));
581 if (j == revcerts.end())
582 unattached_certs.push_back(*i);
583 else
584 j->second.push_back(*i);
585 }
586
587 // if (role == sink_role || role == source_and_sink_role)
588 if (!written_keys.empty()
589 || !written_revisions.empty()
590 || !written_certs.empty())
591 {
592
593 //Keys
594 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
595 i != written_keys.end(); ++i)
596 {
597 app.lua.hook_note_netsync_pubkey_received(*i, session_id);
598 }
599
600 //Revisions
601 for (vector<revision_id>::iterator i = written_revisions.begin();
602 i != written_revisions.end(); ++i)
603 {
604 vector<cert> & ctmp(revcerts[*i]);
605 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
606 for (vector<cert>::const_iterator j = ctmp.begin();
607 j != ctmp.end(); ++j)
608 {
609 cert_value vtmp;
610 decode_base64(j->value, vtmp);
611 certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
612 }
613 revision_data rdat;
614 app.db.get_revision(*i, rdat);
615 app.lua.hook_note_netsync_revision_received(*i, rdat, certs,
616 session_id);
617 }
618
619 //Certs (not attached to a new revision)
620 for (vector<cert>::iterator i = unattached_certs.begin();
621 i != unattached_certs.end(); ++i)
622 {
623 cert_value tmp;
624 decode_base64(i->value, tmp);
625 app.lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key,
626 i->name, tmp, session_id);
627 }
628 }
629 app.lua.hook_note_netsync_end(session_id, error_code,
630 bytes_in, bytes_out,
631 certs_in, certs_out,
632 revs_in, revs_out,
633 keys_in, keys_out);
634}
635
636bool
637session::process_this_rev(revision_id const & rev)
638{
639 id item;
640 decode_hexenc(rev.inner(), item);
641 return (rev_refiner.items_to_send.find(item)
642 != rev_refiner.items_to_send.end());
643}
644
645bool
646session::queue_this_cert(hexenc<id> const & c)
647{
648 id item;
649 decode_hexenc(c, item);
650 return (cert_refiner.items_to_send.find(item)
651 != cert_refiner.items_to_send.end());
652}
653
654bool
655session::queue_this_file(hexenc<id> const & f)
656{
657 return file_items_sent.find(file_id(f)) == file_items_sent.end();
658}
659
660void
661session::note_file_data(file_id const & f)
662{
663 if (role == sink_role)
664 return;
665 file_data fd;
666 id item;
667 decode_hexenc(f.inner(), item);
668 app.db.get_file_version(f, fd);
669 queue_data_cmd(file_item, item, fd.inner()());
670 file_items_sent.insert(f);
671}
672
673void
674session::note_file_delta(file_id const & src, file_id const & dst)
675{
676 if (role == sink_role)
677 return;
678 file_delta fdel;
679 id fid1, fid2;
680 decode_hexenc(src.inner(), fid1);
681 decode_hexenc(dst.inner(), fid2);
682 app.db.get_arbitrary_file_delta(src, dst, fdel);
683 queue_delta_cmd(file_item, fid1, fid2, fdel.inner());
684 file_items_sent.insert(dst);
685}
686
687void
688session::note_rev(revision_id const & rev)
689{
690 if (role == sink_role)
691 return;
692 revision_t rs;
693 id item;
694 decode_hexenc(rev.inner(), item);
695 app.db.get_revision(rev, rs);
696 data tmp;
697 write_revision(rs, tmp);
698 queue_data_cmd(revision_item, item, tmp());
699}
700
701void
702session::note_cert(hexenc<id> const & c)
703{
704 if (role == sink_role)
705 return;
706 id item;
707 decode_hexenc(c, item);
708 revision<cert> cert;
709 string str;
710 app.db.get_revision_cert(c, cert);
711 write_cert(cert.inner(), str);
712 queue_data_cmd(cert_item, item, str);
713}
714
715
716id
717session::mk_nonce()
718{
719 I(this->saved_nonce().size() == 0);
720 char buf[constants::merkle_hash_length_in_bytes];
721 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
722 constants::merkle_hash_length_in_bytes);
723 this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes));
724 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
725 return this->saved_nonce;
726}
727
728void
729session::mark_recent_io()
730{
731 last_io_time = ::time(NULL);
732}
733
734void
735session::set_session_key(string const & key)
736{
737 session_key = netsync_session_key(key);
738 read_hmac.set_key(session_key);
739 write_hmac.set_key(session_key);
740}
741
742void
743session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
744{
745 if (app.opts.use_transport_auth)
746 {
747 keypair our_kp;
748 load_key_pair(app, app.opts.signing_key, our_kp);
749 string hmac_key;
750 decrypt_rsa(app.lua, app.opts.signing_key, our_kp.priv,
751 hmac_key_encrypted, hmac_key);
752 set_session_key(hmac_key);
753 }
754}
755
756void
757session::setup_client_tickers()
758{
759 // xgettext: please use short message and try to avoid multibytes chars
760 byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
761 // xgettext: please use short message and try to avoid multibytes chars
762 byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
763 if (role == sink_role)
764 {
765 // xgettext: please use short message and try to avoid multibytes chars
766 cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
767 // xgettext: please use short message and try to avoid multibytes chars
768 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
769 }
770 else if (role == source_role)
771 {
772 // xgettext: please use short message and try to avoid multibytes chars
773 cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
774 // xgettext: please use short message and try to avoid multibytes chars
775 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
776 }
777 else
778 {
779 I(role == source_and_sink_role);
780 // xgettext: please use short message and try to avoid multibytes chars
781 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
782 // xgettext: please use short message and try to avoid multibytes chars
783 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
784 }
785}
786
787bool
788session::done_all_refinements()
789{
790 bool all = rev_refiner.done
791 && cert_refiner.done
792 && key_refiner.done
793 && epoch_refiner.done;
794
795 if (all && !set_totals)
796 {
797 if (cert_out_ticker.get())
798 cert_out_ticker->set_total(cert_refiner.items_to_send.size());
799
800 if (revision_out_ticker.get())
801 revision_out_ticker->set_total(rev_refiner.items_to_send.size());
802
803 if (cert_in_ticker.get())
804 cert_in_ticker->set_total(cert_refiner.items_to_receive);
805
806 if (revision_in_ticker.get())
807 revision_in_ticker->set_total(rev_refiner.items_to_receive);
808
809 set_totals = true;
810 }
811 return all;
812}
813
814
815
816bool
817session::received_all_items()
818{
819 if (role == source_role)
820 return true;
821 bool all = rev_refiner.items_to_receive == 0
822 && cert_refiner.items_to_receive == 0
823 && key_refiner.items_to_receive == 0
824 && epoch_refiner.items_to_receive == 0;
825 return all;
826}
827
828bool
829session::finished_working()
830{
831 bool all = done_all_refinements()
832 && received_all_items()
833 && queued_all_items()
834 && rev_enumerator.done();
835 return all;
836}
837
838bool
839session::queued_all_items()
840{
841 if (role == sink_role)
842 return true;
843 bool all = rev_refiner.items_to_send.empty()
844 && cert_refiner.items_to_send.empty()
845 && key_refiner.items_to_send.empty()
846 && epoch_refiner.items_to_send.empty();
847 return all;
848}
849
850
851void
852session::maybe_note_epochs_finished()
853{
854 // Maybe there are outstanding epoch requests.
855 // These only matter if we're in sink or source-and-sink mode.
856 if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
857 return;
858
859 // And maybe we haven't even finished the refinement.
860 if (!epoch_refiner.done)
861 return;
862
863 // If we ran into an error -- say a mismatched epoch -- don't do any
864 // further refinements.
865 if (encountered_error)
866 return;
867
868 // But otherwise, we're ready to go. Start the next
869 // set of refinements.
870 if (voice == client_voice)
871 {
872 L(FL("epoch refinement finished; beginning other refinements"));
873 key_refiner.begin_refinement();
874 cert_refiner.begin_refinement();
875 rev_refiner.begin_refinement();
876 }
877 else
878 L(FL("epoch refinement finished"));
879}
880
881static void
882decrement_if_nonzero(netcmd_item_type ty,
883 size_t & n)
884{
885 if (n == 0)
886 {
887 string typestr;
888 netcmd_item_type_to_string(ty, typestr);
889 E(false, F("underflow on count of %s items to receive") % typestr);
890 }
891 --n;
892 if (n == 0)
893 {
894 string typestr;
895 netcmd_item_type_to_string(ty, typestr);
896 L(FL("count of %s items to receive has reached zero") % typestr);
897 }
898}
899
900void
901session::note_item_arrived(netcmd_item_type ty, id const & ident)
902{
903 switch (ty)
904 {
905 case cert_item:
906 decrement_if_nonzero(ty, cert_refiner.items_to_receive);
907 if (cert_in_ticker.get() != NULL)
908 ++(*cert_in_ticker);
909 ++certs_in;
910 break;
911 case revision_item:
912 decrement_if_nonzero(ty, rev_refiner.items_to_receive);
913 if (revision_in_ticker.get() != NULL)
914 ++(*revision_in_ticker);
915 ++revs_in;
916 break;
917 case key_item:
918 decrement_if_nonzero(ty, key_refiner.items_to_receive);
919 ++keys_in;
920 break;
921 case epoch_item:
922 decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
923 break;
924 default:
925 // No ticker for other things.
926 break;
927 }
928}
929
930
931
932void
933session::note_item_sent(netcmd_item_type ty, id const & ident)
934{
935 switch (ty)
936 {
937 case cert_item:
938 cert_refiner.items_to_send.erase(ident);
939 if (cert_out_ticker.get() != NULL)
940 ++(*cert_out_ticker);
941 ++certs_out;
942 break;
943 case revision_item:
944 rev_refiner.items_to_send.erase(ident);
945 if (revision_out_ticker.get() != NULL)
946 ++(*revision_out_ticker);
947 ++revs_out;
948 break;
949 case key_item:
950 key_refiner.items_to_send.erase(ident);
951 ++keys_out;
952 break;
953 case epoch_item:
954 epoch_refiner.items_to_send.erase(ident);
955 break;
956 default:
957 // No ticker for other things.
958 break;
959 }
960}
961
962void
963session::write_netcmd_and_try_flush(netcmd const & cmd)
964{
965 if (!encountered_error)
966 {
967 string buf;
968 cmd.write(buf, write_hmac);
969 outbuf.push_back(make_pair(buf, 0));
970 outbuf_size += buf.size();
971 }
972 else
973 L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
974 // FIXME: this helps keep the protocol pipeline full but it seems to
975 // interfere with initial and final sequences. careful with it.
976 // write_some();
977 // read_some();
978}
979
980// This method triggers a special "error unwind" mode to netsync. In this
981// mode, all received data is ignored, and no new data is queued. We simply
982// stay connected long enough for the current write buffer to be flushed, to
983// ensure that our peer receives the error message.
984// Affects read_some, write_some, and process .
985void
986session::error(int errcode, string const & errmsg)
987{
988 error_code = errcode;
989 throw netsync_error(errmsg);
990}
991
992Netxx::Probe::ready_type
993session::which_events() const
994{
995 // Only ask to read if we're not armed.
996 if (outbuf.empty())
997 {
998 if (inbuf.size() < constants::netcmd_maxsz && !armed)
999 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1000 else
1001 return Netxx::Probe::ready_oobd;
1002 }
1003 else
1004 {
1005 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1006 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1007 else
1008 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1009 }
1010}
1011
1012bool
1013session::read_some()
1014{
1015 I(inbuf.size() < constants::netcmd_maxsz);
1016 char tmp[constants::bufsz];
1017 Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
1018 if (count > 0)
1019 {
1020 L(FL("read %d bytes from fd %d (peer %s)") % count % str->get_socketfd() % peer_id);
1021 if (encountered_error)
1022 {
1023 L(FL("in error unwind mode, so throwing them into the bit bucket"));
1024 return true;
1025 }
1026 inbuf.append(tmp,count);
1027 mark_recent_io();
1028 if (byte_in_ticker.get() != NULL)
1029 (*byte_in_ticker) += count;
1030 bytes_in += count;
1031 return true;
1032 }
1033 else
1034 return false;
1035}
1036
1037bool
1038session::write_some()
1039{
1040 I(!outbuf.empty());
1041 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
1042 Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
1043 min(writelen,
1044 constants::bufsz));
1045 if (count > 0)
1046 {
1047 if ((size_t)count == writelen)
1048 {
1049 outbuf_size -= outbuf.front().first.size();
1050 outbuf.pop_front();
1051 }
1052 else
1053 {
1054 outbuf.front().second += count;
1055 }
1056 L(FL("wrote %d bytes to fd %d (peer %s)")
1057 % count % str->get_socketfd() % peer_id);
1058 mark_recent_io();
1059 if (byte_out_ticker.get() != NULL)
1060 (*byte_out_ticker) += count;
1061 bytes_out += count;
1062 if (encountered_error && outbuf.empty())
1063 {
1064 // we've flushed our error message, so it's time to get out.
1065 L(FL("finished flushing output queue in error unwind mode, disconnecting"));
1066 return false;
1067 }
1068 return true;
1069 }
1070 else
1071 return false;
1072}
1073
1074// senders
1075
1076void
1077session::queue_error_cmd(string const & errmsg)
1078{
1079 L(FL("queueing 'error' command"));
1080 netcmd cmd;
1081 cmd.write_error_cmd(errmsg);
1082 write_netcmd_and_try_flush(cmd);
1083}
1084
1085void
1086session::queue_bye_cmd(u8 phase)
1087{
1088 L(FL("queueing 'bye' command, phase %d")
1089 % static_cast<size_t>(phase));
1090 netcmd cmd;
1091 cmd.write_bye_cmd(phase);
1092 write_netcmd_and_try_flush(cmd);
1093}
1094
1095void
1096session::queue_done_cmd(netcmd_item_type type,
1097 size_t n_items)
1098{
1099 string typestr;
1100 netcmd_item_type_to_string(type, typestr);
1101 L(FL("queueing 'done' command for %s (%d items)")
1102 % typestr % n_items);
1103 netcmd cmd;
1104 cmd.write_done_cmd(type, n_items);
1105 write_netcmd_and_try_flush(cmd);
1106}
1107
1108void
1109session::queue_hello_cmd(rsa_keypair_id const & key_name,
1110 base64<rsa_pub_key> const & pub_encoded,
1111 id const & nonce)
1112{
1113 rsa_pub_key pub;
1114 if (app.opts.use_transport_auth)
1115 decode_base64(pub_encoded, pub);
1116 cmd.write_hello_cmd(key_name, pub, nonce);
1117 write_netcmd_and_try_flush(cmd);
1118}
1119
1120void
1121session::queue_anonymous_cmd(protocol_role role,
1122 globish const & include_pattern,
1123 globish const & exclude_pattern,
1124 id const & nonce2,
1125 base64<rsa_pub_key> server_key_encoded)
1126{
1127 netcmd cmd;
1128 rsa_oaep_sha_data hmac_key_encrypted;
1129 if (app.opts.use_transport_auth)
1130 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1131 nonce2(), hmac_key_encrypted);
1132 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1133 hmac_key_encrypted);
1134 write_netcmd_and_try_flush(cmd);
1135 set_session_key(nonce2());
1136}
1137
1138void
1139session::queue_auth_cmd(protocol_role role,
1140 globish const & include_pattern,
1141 globish const & exclude_pattern,
1142 id const & client,
1143 id const & nonce1,
1144 id const & nonce2,
1145 string const & signature,
1146 base64<rsa_pub_key> server_key_encoded)
1147{
1148 netcmd cmd;
1149 rsa_oaep_sha_data hmac_key_encrypted;
1150 I(app.opts.use_transport_auth);
1151 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1152 nonce2(), hmac_key_encrypted);
1153 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1154 nonce1, hmac_key_encrypted, signature);
1155 write_netcmd_and_try_flush(cmd);
1156 set_session_key(nonce2());
1157}
1158
1159void
1160session::queue_confirm_cmd()
1161{
1162 netcmd cmd;
1163 cmd.write_confirm_cmd();
1164 write_netcmd_and_try_flush(cmd);
1165}
1166
1167void
1168session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
1169{
1170 string typestr;
1171 hexenc<prefix> hpref;
1172 node.get_hex_prefix(hpref);
1173 netcmd_item_type_to_string(node.type, typestr);
1174 L(FL("queueing refinement %s of %s node '%s', level %d")
1175 % (ty == refinement_query ? "query" : "response")
1176 % typestr % hpref % static_cast<int>(node.level));
1177 netcmd cmd;
1178 cmd.write_refine_cmd(ty, node);
1179 write_netcmd_and_try_flush(cmd);
1180}
1181
1182void
1183session::queue_data_cmd(netcmd_item_type type,
1184 id const & item,
1185 string const & dat)
1186{
1187 string typestr;
1188 netcmd_item_type_to_string(type, typestr);
1189 hexenc<id> hid;
1190 encode_hexenc(item, hid);
1191
1192 if (role == sink_role)
1193 {
1194 L(FL("not queueing %s data for '%s' as we are in pure sink role")
1195 % typestr % hid);
1196 return;
1197 }
1198
1199 L(FL("queueing %d bytes of data for %s item '%s'")
1200 % dat.size() % typestr % hid);
1201
1202 netcmd cmd;
1203 // TODO: This pair of functions will make two copies of a large
1204 // file, the first in cmd.write_data_cmd, and the second in
1205 // write_netcmd_and_try_flush when the data is copied from the
1206 // cmd.payload variable to the string buffer for output. This
1207 // double copy should be collapsed out, it may be better to use
1208 // a string_queue for output as well as input, as that will reduce
1209 // the amount of mallocs that happen when the string queue is large
1210 // enough to just store the data.
1211 cmd.write_data_cmd(type, item, dat);
1212 write_netcmd_and_try_flush(cmd);
1213 note_item_sent(type, item);
1214}
1215
1216void
1217session::queue_delta_cmd(netcmd_item_type type,
1218 id const & base,
1219 id const & ident,
1220 delta const & del)
1221{
1222 I(type == file_item);
1223 string typestr;
1224 netcmd_item_type_to_string(type, typestr);
1225 hexenc<id> base_hid;
1226 encode_hexenc(base, base_hid);
1227 hexenc<id> ident_hid;
1228 encode_hexenc(ident, ident_hid);
1229
1230 if (role == sink_role)
1231 {
1232 L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role")
1233 % typestr % base_hid % ident_hid);
1234 return;
1235 }
1236
1237 L(FL("queueing %s delta '%s' -> '%s'")
1238 % typestr % base_hid % ident_hid);
1239 netcmd cmd;
1240 cmd.write_delta_cmd(type, base, ident, del);
1241 write_netcmd_and_try_flush(cmd);
1242 note_item_sent(type, ident);
1243}
1244
1245
1246// processors
1247
1248bool
1249session::process_error_cmd(string const & errmsg)
1250{
1251 // "xxx string" with xxx being digits means there's an error code
1252 if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
1253 {
1254 try
1255 {
1256 int err = boost::lexical_cast<int>(errmsg.substr(0,3));
1257 if (err >= 100)
1258 {
1259 error_code = err;
1260 throw bad_decode(F("received network error: %s")
1261 % errmsg.substr(4));
1262 }
1263 }
1264 catch (boost::bad_lexical_cast)
1265 { // ok, so it wasn't a number
1266 }
1267 }
1268 throw bad_decode(F("received network error: %s") % errmsg);
1269}
1270
1271static const var_domain known_servers_domain = var_domain("known-servers");
1272
1273bool
1274session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1275 rsa_pub_key const & their_key,
1276 id const & nonce)
1277{
1278 I(this->remote_peer_key_hash().size() == 0);
1279 I(this->saved_nonce().size() == 0);
1280
1281 base64<rsa_pub_key> their_key_encoded;
1282
1283 if (app.opts.use_transport_auth)
1284 {
1285 hexenc<id> their_key_hash;
1286 encode_base64(their_key, their_key_encoded);
1287 key_hash_code(their_keyname, their_key_encoded, their_key_hash);
1288 L(FL("server key has name %s, hash %s") % their_keyname % their_key_hash);
1289 var_key their_key_key(known_servers_domain, var_name(peer_id));
1290 if (app.db.var_exists(their_key_key))
1291 {
1292 var_value expected_key_hash;
1293 app.db.get_var(their_key_key, expected_key_hash);
1294 if (expected_key_hash() != their_key_hash())
1295 {
1296 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1297 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1298 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1299 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1300 "it is also possible that the server key has just been changed\n"
1301 "remote host sent key %s\n"
1302 "I expected %s\n"
1303 "'%s unset %s %s' overrides this check")
1304 % their_key_hash % expected_key_hash
1305 % ui.prog_name % their_key_key.first % their_key_key.second);
1306 E(false, F("server key changed"));
1307 }
1308 }
1309 else
1310 {
1311 P(F("first time connecting to server %s\n"
1312 "I'll assume it's really them, but you might want to double-check\n"
1313 "their key's fingerprint: %s") % peer_id % their_key_hash);
1314 app.db.set_var(their_key_key, var_value(their_key_hash()));
1315 }
1316 if (app.db.put_key(their_keyname, their_key_encoded))
1317 W(F("saving public key for %s to database") % their_keyname);
1318
1319 {
1320 hexenc<id> hnonce;
1321 encode_hexenc(nonce, hnonce);
1322 L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
1323 % their_key_hash % hnonce);
1324 }
1325
1326 I(app.db.public_key_exists(their_key_hash));
1327
1328 // save their identity
1329 id their_key_hash_decoded;
1330 decode_hexenc(their_key_hash, their_key_hash_decoded);
1331 this->remote_peer_key_hash = their_key_hash_decoded;
1332 }
1333
1334 // clients always include in the synchronization set, every branch that the
1335 // user requested
1336 set<branch_name> all_branches, ok_branches;
1337 app.get_project().get_branch_list(all_branches, false);
1338 for (set<branch_name>::const_iterator i = all_branches.begin();
1339 i != all_branches.end(); i++)
1340 {
1341 if (our_matcher((*i)()))
1342 ok_branches.insert(*i);
1343 }
1344 rebuild_merkle_trees(app, ok_branches);
1345
1346 if (!initiated_by_server)
1347 setup_client_tickers();
1348
1349 if (app.opts.use_transport_auth &&
1350 app.opts.signing_key() != "")
1351 {
1352 // get our key pair
1353 keypair our_kp;
1354 load_key_pair(app, app.opts.signing_key, our_kp);
1355
1356 // get the hash identifier for our pubkey
1357 hexenc<id> our_key_hash;
1358 id our_key_hash_raw;
1359 key_hash_code(app.opts.signing_key, our_kp.pub, our_key_hash);
1360 decode_hexenc(our_key_hash, our_key_hash_raw);
1361
1362 // make a signature
1363 base64<rsa_sha1_signature> sig;
1364 rsa_sha1_signature sig_raw;
1365 make_signature(app, app.opts.signing_key, our_kp.priv, nonce(), sig);
1366 decode_base64(sig, sig_raw);
1367
1368 // make a new nonce of our own and send off the 'auth'
1369 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1370 our_key_hash_raw, nonce, mk_nonce(), sig_raw(),
1371 their_key_encoded);
1372 }
1373 else
1374 {
1375 queue_anonymous_cmd(this->role, our_include_pattern,
1376 our_exclude_pattern, mk_nonce(), their_key_encoded);
1377 }
1378
1379 app.lua.hook_note_netsync_start(session_id, "client", this->role,
1380 peer_id, their_keyname,
1381 our_include_pattern, our_exclude_pattern);
1382
1383 return true;
1384}
1385
1386bool
1387session::process_anonymous_cmd(protocol_role their_role,
1388 globish const & their_include_pattern,
1389 globish const & their_exclude_pattern)
1390{
1391 // Internally netsync thinks in terms of sources and sinks. Users like
1392 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1393 //
1394 // We therefore use the read/write terminology when dealing with the UI:
1395 // if the user asks to run a "read only" service, this means they are
1396 // willing to be a source but not a sink.
1397 //
1398 // nb: The "role" here is the role the *client* wants to play
1399 // so we need to check that the opposite role is allowed for us,
1400 // in our this->role field.
1401 //
1402
1403 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1404 peer_id, rsa_keypair_id(),
1405 their_include_pattern, their_exclude_pattern);
1406
1407 // Client must be a sink and server must be a source (anonymous
1408 // read-only), unless transport auth is disabled.
1409 //
1410 // If running in no-transport-auth mode, we operate anonymously and
1411 // permit adoption of any role.
1412
1413 if (app.opts.use_transport_auth)
1414 {
1415 if (their_role != sink_role)
1416 {
1417 this->saved_nonce = id("");
1418 error(not_permitted,
1419 F("rejected attempt at anonymous connection for write").str());
1420 }
1421
1422 if (this->role == sink_role)
1423 {
1424 this->saved_nonce = id("");
1425 error(role_mismatch,
1426 F("rejected attempt at anonymous connection while running as sink").str());
1427 }
1428 }
1429
1430 set<branch_name> all_branches, ok_branches;
1431 app.get_project().get_branch_list(all_branches, false);
1432 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1433 for (set<branch_name>::const_iterator i = all_branches.begin();
1434 i != all_branches.end(); i++)
1435 {
1436 if (their_matcher((*i)()))
1437 if (app.opts.use_transport_auth &&
1438 !app.lua.hook_get_netsync_read_permitted((*i)()))
1439 {
1440 error(not_permitted,
1441 (F("anonymous access to branch '%s' denied by server") % *i).str());
1442 }
1443 else
1444 ok_branches.insert(*i);
1445 }
1446
1447 if (app.opts.use_transport_auth)
1448 {
1449 P(F("allowed anonymous read permission for '%s' excluding '%s'")
1450 % their_include_pattern % their_exclude_pattern);
1451 this->role = source_role;
1452 }
1453 else
1454 {
1455 P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
1456 % their_include_pattern % their_exclude_pattern);
1457 assume_corresponding_role(their_role);
1458 }
1459
1460 rebuild_merkle_trees(app, ok_branches);
1461
1462 this->remote_peer_key_name = rsa_keypair_id("");
1463 this->authenticated = true;
1464 return true;
1465}
1466
1467void
1468session::assume_corresponding_role(protocol_role their_role)
1469{
1470 // Assume the (possibly degraded) opposite role.
1471 switch (their_role)
1472 {
1473 case source_role:
1474 I(this->role != source_role);
1475 this->role = sink_role;
1476 break;
1477
1478 case source_and_sink_role:
1479 I(this->role == source_and_sink_role);
1480 break;
1481
1482 case sink_role:
1483 I(this->role != sink_role);
1484 this->role = source_role;
1485 break;
1486 }
1487}
1488
1489bool
1490session::process_auth_cmd(protocol_role their_role,
1491 globish const & their_include_pattern,
1492 globish const & their_exclude_pattern,
1493 id const & client,
1494 id const & nonce1,
1495 string const & signature)
1496{
1497 I(this->remote_peer_key_hash().size() == 0);
1498 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1499
1500 hexenc<id> their_key_hash;
1501 encode_hexenc(client, their_key_hash);
1502
1503 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1504
1505 if (!app.db.public_key_exists(their_key_hash))
1506 {
1507 // If it's not in the db, it still could be in the keystore if we
1508 // have the private key that goes with it.
1509 if (!app.keys.try_ensure_in_db(their_key_hash))
1510 {
1511 this->saved_nonce = id("");
1512
1513 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1514 peer_id, rsa_keypair_id("-unknown-"),
1515 their_include_pattern,
1516 their_exclude_pattern);
1517 error(unknown_key,
1518 (F("remote public key hash '%s' is unknown") % their_key_hash).str());
1519 }
1520 }
1521
1522 // Get their public key.
1523 rsa_keypair_id their_id;
1524 base64<rsa_pub_key> their_key;
1525 app.db.get_pubkey(their_key_hash, their_id, their_key);
1526
1527 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1528 peer_id, their_id,
1529 their_include_pattern, their_exclude_pattern);
1530
1531 // Check that they replied with the nonce we asked for.
1532 if (!(nonce1 == this->saved_nonce))
1533 {
1534 this->saved_nonce = id("");
1535 error(failed_identification,
1536 F("detected replay attack in auth netcmd").str());
1537 }
1538
1539 // Internally netsync thinks in terms of sources and sinks. users like
1540 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1541 //
1542 // We therefore use the read/write terminology when dealing with the UI:
1543 // if the user asks to run a "read only" service, this means they are
1544 // willing to be a source but not a sink.
1545 //
1546 // nb: The "their_role" here is the role the *client* wants to play
1547 // so we need to check that the opposite role is allowed for us,
1548 // in our this->role field.
1549
1550 // Client as sink, server as source (reading).
1551
1552 if (their_role == sink_role || their_role == source_and_sink_role)
1553 {
1554 if (this->role != source_role && this->role != source_and_sink_role)
1555 {
1556 this->saved_nonce = id("");
1557 error(not_permitted,
1558 (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
1559 % their_id % their_include_pattern % their_exclude_pattern).str());
1560 }
1561 }
1562
1563 set<branch_name> all_branches, ok_branches;
1564 app.get_project().get_branch_list(all_branches, false);
1565 for (set<branch_name>::const_iterator i = all_branches.begin();
1566 i != all_branches.end(); i++)
1567 {
1568 if (their_matcher((*i)()))
1569 {
1570 if (!app.lua.hook_get_netsync_read_permitted((*i)(), their_id))
1571 {
1572 error(not_permitted,
1573 (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
1574 % their_id % their_include_pattern % their_exclude_pattern % *i).str());
1575 }
1576 else
1577 ok_branches.insert(*i);
1578 }
1579 }
1580
1581 // If we're source_and_sink_role, continue even with no branches readable
1582 // eg. serve --db=empty.db
1583 P(F("allowed '%s' read permission for '%s' excluding '%s'")
1584 % their_id % their_include_pattern % their_exclude_pattern);
1585
1586 // Client as source, server as sink (writing).
1587
1588 if (their_role == source_role || their_role == source_and_sink_role)
1589 {
1590 if (this->role != sink_role && this->role != source_and_sink_role)
1591 {
1592 this->saved_nonce = id("");
1593 error(not_permitted,
1594 (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
1595 % their_id % their_include_pattern % their_exclude_pattern).str());
1596 }
1597
1598 if (!app.lua.hook_get_netsync_write_permitted(their_id))
1599 {
1600 this->saved_nonce = id("");
1601 error(not_permitted,
1602 (F("denied '%s' write permission for '%s' excluding '%s'")
1603 % their_id % their_include_pattern % their_exclude_pattern).str());
1604 }
1605
1606 P(F("allowed '%s' write permission for '%s' excluding '%s'")
1607 % their_id % their_include_pattern % their_exclude_pattern);
1608 }
1609
1610 rebuild_merkle_trees(app, ok_branches);
1611
1612 // Save their identity.
1613 this->remote_peer_key_hash = client;
1614
1615 // Check the signature.
1616 base64<rsa_sha1_signature> sig;
1617 encode_base64(rsa_sha1_signature(signature), sig);
1618 if (check_signature(app, their_id, their_key, nonce1(), sig))
1619 {
1620 // Get our private key and sign back.
1621 L(FL("client signature OK, accepting authentication"));
1622 this->authenticated = true;
1623 this->remote_peer_key_name = their_id;
1624
1625 assume_corresponding_role(their_role);
1626 return true;
1627 }
1628 else
1629 {
1630 error(failed_identification, (F("bad client signature")).str());
1631 }
1632 return false;
1633}
1634
1635bool
1636session::process_refine_cmd(refinement_type ty, merkle_node const & node)
1637{
1638 string typestr;
1639 netcmd_item_type_to_string(node.type, typestr);
1640 L(FL("processing refine cmd for %s node at level %d")
1641 % typestr % node.level);
1642
1643 switch (node.type)
1644 {
1645 case file_item:
1646 W(F("Unexpected 'refine' command on non-refined item type"));
1647 break;
1648
1649 case key_item:
1650 key_refiner.process_refinement_command(ty, node);
1651 break;
1652
1653 case revision_item:
1654 rev_refiner.process_refinement_command(ty, node);
1655 break;
1656
1657 case cert_item:
1658 cert_refiner.process_refinement_command(ty, node);
1659 break;
1660
1661 case epoch_item:
1662 epoch_refiner.process_refinement_command(ty, node);
1663 break;
1664 }
1665 return true;
1666}
1667
1668bool
1669session::process_bye_cmd(u8 phase,
1670 transaction_guard & guard)
1671{
1672
1673// Ideal shutdown
1674// ~~~~~~~~~~~~~~~
1675//
1676// I/O events state transitions
1677// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
1678// client: C_WORKING
1679// server: S_WORKING
1680// 0. [refinement, data, deltas, etc.]
1681// client: C_SHUTDOWN
1682// (client checkpoints here)
1683// 1. client -> "bye 0"
1684// 2. "bye 0" -> server
1685// server: S_SHUTDOWN
1686// (server checkpoints here)
1687// 3. "bye 1" <- server
1688// 4. client <- "bye 1"
1689// client: C_CONFIRMED
1690// 5. client -> "bye 2"
1691// 6. "bye 2" -> server
1692// server: S_CONFIRMED
1693// 7. [server drops connection]
1694//
1695//
1696// Affects of I/O errors or disconnections
1697// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1698// C_WORKING: report error and fault
1699// S_WORKING: report error and recover
1700// C_SHUTDOWN: report error and fault
1701// S_SHUTDOWN: report success and recover
1702// (and warn that client might falsely see error)
1703// C_CONFIRMED: report success
1704// S_CONFIRMED: report success
1705
1706 switch (phase)
1707 {
1708 case 0:
1709 if (voice == server_voice &&
1710 protocol_state == working_state)
1711 {
1712 protocol_state = shutdown_state;
1713 guard.do_checkpoint();
1714 queue_bye_cmd(1);
1715 }
1716 else
1717 error(bad_command, "unexpected bye phase 0 received");
1718 break;
1719
1720 case 1:
1721 if (voice == client_voice &&
1722 protocol_state == shutdown_state)
1723 {
1724 protocol_state = confirmed_state;
1725 queue_bye_cmd(2);
1726 }
1727 else
1728 error(bad_command, "unexpected bye phase 1 received");
1729 break;
1730
1731 case 2:
1732 if (voice == server_voice &&
1733 protocol_state == shutdown_state)
1734 {
1735 protocol_state = confirmed_state;
1736 return false;
1737 }
1738 else
1739 error(bad_command, "unexpected bye phase 2 received");
1740 break;
1741
1742 default:
1743 error(bad_command, (F("unknown bye phase %d received") % phase).str());
1744 }
1745
1746 return true;
1747}
1748
1749bool
1750session::process_done_cmd(netcmd_item_type type, size_t n_items)
1751{
1752 string typestr;
1753 netcmd_item_type_to_string(type, typestr);
1754 L(FL("received 'done' command for %s (%s items)") % typestr % n_items);
1755 switch (type)
1756 {
1757 case file_item:
1758 W(F("Unexpected 'done' command on non-refined item type"));
1759 break;
1760
1761 case key_item:
1762 key_refiner.process_done_command(n_items);
1763 if (key_refiner.done && role != sink_role)
1764 send_all_data(key_item, key_refiner.items_to_send);
1765 break;
1766
1767 case revision_item:
1768 rev_refiner.process_done_command(n_items);
1769 break;
1770
1771 case cert_item:
1772 cert_refiner.process_done_command(n_items);
1773 break;
1774
1775 case epoch_item:
1776 epoch_refiner.process_done_command(n_items);
1777 if (epoch_refiner.done)
1778 {
1779 send_all_data(epoch_item, epoch_refiner.items_to_send);
1780 maybe_note_epochs_finished();
1781 }
1782 break;
1783 }
1784 return true;
1785}
1786
1787void
1788session::respond_to_confirm_cmd()
1789{
1790 epoch_refiner.begin_refinement();
1791}
1792
1793bool
1794session::data_exists(netcmd_item_type type,
1795 id const & item)
1796{
1797 hexenc<id> hitem;
1798 encode_hexenc(item, hitem);
1799 switch (type)
1800 {
1801 case key_item:
1802 return key_refiner.local_item_exists(item)
1803 || app.db.public_key_exists(hitem);
1804 case file_item:
1805 return app.db.file_version_exists(file_id(hitem));
1806 case revision_item:
1807 return rev_refiner.local_item_exists(item)
1808 || app.db.revision_exists(revision_id(hitem));
1809 case cert_item:
1810 return cert_refiner.local_item_exists(item)
1811 || app.db.revision_cert_exists(hitem);
1812 case epoch_item:
1813 return epoch_refiner.local_item_exists(item)
1814 || app.db.epoch_exists(epoch_id(hitem));
1815 }
1816 return false;
1817}
1818
1819void
1820session::load_data(netcmd_item_type type,
1821 id const & item,
1822 string & out)
1823{
1824 string typestr;
1825 netcmd_item_type_to_string(type, typestr);
1826 hexenc<id> hitem;
1827 encode_hexenc(item, hitem);
1828
1829 if (!data_exists(type, item))
1830 throw bad_decode(F("%s with hash '%s' does not exist in our database")
1831 % typestr % hitem);
1832
1833 switch (type)
1834 {
1835 case epoch_item:
1836 {
1837 branch_name branch;
1838 epoch_data epoch;
1839 app.db.get_epoch(epoch_id(hitem), branch, epoch);
1840 write_epoch(branch, epoch, out);
1841 }
1842 break;
1843 case key_item:
1844 {
1845 rsa_keypair_id keyid;
1846 base64<rsa_pub_key> pub_encoded;
1847 app.db.get_pubkey(hitem, keyid, pub_encoded);
1848 L(FL("public key '%s' is also called '%s'") % hitem % keyid);
1849 write_pubkey(keyid, pub_encoded, out);
1850 }
1851 break;
1852
1853 case revision_item:
1854 {
1855 revision_data mdat;
1856 data dat;
1857 app.db.get_revision(revision_id(hitem), mdat);
1858 out = mdat.inner()();
1859 }
1860 break;
1861
1862 case file_item:
1863 {
1864 file_data fdat;
1865 data dat;
1866 app.db.get_file_version(file_id(hitem), fdat);
1867 out = fdat.inner()();
1868 }
1869 break;
1870
1871 case cert_item:
1872 {
1873 revision<cert> c;
1874 app.db.get_revision_cert(hitem, c);
1875 string tmp;
1876 write_cert(c.inner(), out);
1877 }
1878 break;
1879 }
1880}
1881
1882bool
1883session::process_data_cmd(netcmd_item_type type,
1884 id const & item,
1885 string const & dat)
1886{
1887 hexenc<id> hitem;
1888 encode_hexenc(item, hitem);
1889
1890 string typestr;
1891 netcmd_item_type_to_string(type, typestr);
1892
1893 note_item_arrived(type, item);
1894 if (data_exists(type, item))
1895 {
1896 L(FL("%s '%s' already exists in our database") % typestr % hitem);
1897 if (type == epoch_item)
1898 maybe_note_epochs_finished();
1899 return true;
1900 }
1901
1902 switch (type)
1903 {
1904 case epoch_item:
1905 {
1906 branch_name branch;
1907 epoch_data epoch;
1908 read_epoch(dat, branch, epoch);
1909 L(FL("received epoch %s for branch %s") % epoch % branch);
1910 map<branch_name, epoch_data> epochs;
1911 app.db.get_epochs(epochs);
1912 map<branch_name, epoch_data>::const_iterator i;
1913 i = epochs.find(branch);
1914 if (i == epochs.end())
1915 {
1916 L(FL("branch %s has no epoch; setting epoch to %s") % branch % epoch);
1917 app.db.set_epoch(branch, epoch);
1918 }
1919 else
1920 {
1921 L(FL("branch %s already has an epoch; checking") % branch);
1922 // If we get here, then we know that the epoch must be
1923 // different, because if it were the same then the
1924 // if (epoch_exists()) branch up above would have been taken.
1925 // if somehow this is wrong, then we have broken epoch
1926 // hashing or something, which is very dangerous, so play it
1927 // safe...
1928 I(!(i->second == epoch));
1929
1930 // It is safe to call 'error' here, because if we get here,
1931 // then the current netcmd packet cannot possibly have
1932 // written anything to the database.
1933 error(mixing_versions,
1934 (F("Mismatched epoch on branch %s."
1935 " Server has '%s', client has '%s'.")
1936 % branch
1937 % (voice == server_voice ? i->second : epoch)
1938 % (voice == server_voice ? epoch : i->second)).str());
1939 }
1940 }
1941 maybe_note_epochs_finished();
1942 break;
1943
1944 case key_item:
1945 {
1946 rsa_keypair_id keyid;
1947 base64<rsa_pub_key> pub;
1948 read_pubkey(dat, keyid, pub);
1949 hexenc<id> tmp;
1950 key_hash_code(keyid, pub, tmp);
1951 if (! (tmp == hitem))
1952 throw bad_decode(F("hash check failed for public key '%s' (%s);"
1953 " wanted '%s' got '%s'")
1954 % hitem % keyid % hitem % tmp);
1955 if (app.db.put_key(keyid, pub))
1956 written_keys.push_back(keyid);
1957 }
1958 break;
1959
1960 case cert_item:
1961 {
1962 cert c;
1963 read_cert(dat, c);
1964 hexenc<id> tmp;
1965 cert_hash_code(c, tmp);
1966 if (! (tmp == hitem))
1967 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem);
1968 if (app.db.put_revision_cert(revision<cert>(c)))
1969 written_certs.push_back(c);
1970 }
1971 break;
1972
1973 case revision_item:
1974 {
1975 L(FL("received revision '%s'") % hitem);
1976 if (app.db.put_revision(revision_id(hitem), revision_data(dat)))
1977 written_revisions.push_back(revision_id(hitem));
1978 }
1979 break;
1980
1981 case file_item:
1982 {
1983 L(FL("received file '%s'") % hitem);
1984 app.db.put_file(file_id(hitem), file_data(dat));
1985 }
1986 break;
1987 }
1988 return true;
1989}
1990
1991bool
1992session::process_delta_cmd(netcmd_item_type type,
1993 id const & base,
1994 id const & ident,
1995 delta const & del)
1996{
1997 string typestr;
1998 netcmd_item_type_to_string(type, typestr);
1999 hexenc<id> hbase, hident;
2000 encode_hexenc(base, hbase);
2001 encode_hexenc(ident, hident);
2002
2003 pair<id,id> id_pair = make_pair(base, ident);
2004
2005 note_item_arrived(type, ident);
2006
2007 switch (type)
2008 {
2009 case file_item:
2010 {
2011 file_id src_file(hbase), dst_file(hident);
2012 app.db.put_file_version(src_file, dst_file, file_delta(del));
2013 }
2014 break;
2015
2016 default:
2017 L(FL("ignoring delta received for item type %s") % typestr);
2018 break;
2019 }
2020 return true;
2021}
2022
2023bool
2024session::process_usher_cmd(utf8 const & msg)
2025{
2026 if (msg().size())
2027 {
2028 if (msg()[0] == '!')
2029 P(F("Received warning from usher: %s") % msg().substr(1));
2030 else
2031 L(FL("Received greeting from usher: %s") % msg().substr(1));
2032 }
2033 netcmd cmdout;
2034 cmdout.write_usher_reply_cmd(utf8(peer_id), our_include_pattern);
2035 write_netcmd_and_try_flush(cmdout);
2036 L(FL("Sent reply."));
2037 return true;
2038}
2039
2040
2041void
2042session::send_all_data(netcmd_item_type ty, set<id> const & items)
2043{
2044 string typestr;
2045 netcmd_item_type_to_string(ty, typestr);
2046
2047 // Use temporary; passed arg will be invalidated during iteration.
2048 set<id> tmp = items;
2049
2050 for (set<id>::const_iterator i = tmp.begin();
2051 i != tmp.end(); ++i)
2052 {
2053 hexenc<id> hitem;
2054 encode_hexenc(*i, hitem);
2055
2056 if (data_exists(ty, *i))
2057 {
2058 string out;
2059 load_data(ty, *i, out);
2060 queue_data_cmd(ty, *i, out);
2061 }
2062 }
2063}
2064
2065bool
2066session::dispatch_payload(netcmd const & cmd,
2067 transaction_guard & guard)
2068{
2069
2070 switch (cmd.get_cmd_code())
2071 {
2072
2073 case error_cmd:
2074 {
2075 string errmsg;
2076 cmd.read_error_cmd(errmsg);
2077 return process_error_cmd(errmsg);
2078 }
2079 break;
2080
2081 case hello_cmd:
2082 require(! authenticated, "hello netcmd received when not authenticated");
2083 require(voice == client_voice, "hello netcmd received in client voice");
2084 {
2085 rsa_keypair_id server_keyname;
2086 rsa_pub_key server_key;
2087 id nonce;
2088 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2089 return process_hello_cmd(server_keyname, server_key, nonce);
2090 }
2091 break;
2092
2093 case bye_cmd:
2094 require(authenticated, "bye netcmd received when not authenticated");
2095 {
2096 u8 phase;
2097 cmd.read_bye_cmd(phase);
2098 return process_bye_cmd(phase, guard);
2099 }
2100 break;
2101
2102 case anonymous_cmd:
2103 require(! authenticated, "anonymous netcmd received when not authenticated");
2104 require(voice == server_voice, "anonymous netcmd received in server voice");
2105 require(role == source_role ||
2106 role == source_and_sink_role,
2107 "anonymous netcmd received in source or source/sink role");
2108 {
2109 protocol_role role;
2110 globish their_include_pattern, their_exclude_pattern;
2111 rsa_oaep_sha_data hmac_key_encrypted;
2112 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2113 L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2114 "in %s mode\n")
2115 % their_include_pattern % their_exclude_pattern
2116 % (role == source_and_sink_role ? _("source and sink") :
2117 (role == source_role ? _("source") : _("sink"))));
2118
2119 set_session_key(hmac_key_encrypted);
2120 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2121 return false;
2122 queue_confirm_cmd();
2123 return true;
2124 }
2125 break;
2126
2127 case auth_cmd:
2128 require(! authenticated, "auth netcmd received when not authenticated");
2129 require(voice == server_voice, "auth netcmd received in server voice");
2130 {
2131 protocol_role role;
2132 string signature;
2133 globish their_include_pattern, their_exclude_pattern;
2134 id client, nonce1, nonce2;
2135 rsa_oaep_sha_data hmac_key_encrypted;
2136 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2137 client, nonce1, hmac_key_encrypted, signature);
2138
2139 hexenc<id> their_key_hash;
2140 encode_hexenc(client, their_key_hash);
2141 hexenc<id> hnonce1;
2142 encode_hexenc(nonce1, hnonce1);
2143
2144 L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2145 "exclude '%s' in %s mode with nonce1 '%s'\n")
2146 % their_key_hash % their_include_pattern % their_exclude_pattern
2147 % (role == source_and_sink_role ? _("source and sink") :
2148 (role == source_role ? _("source") : _("sink")))
2149 % hnonce1);
2150
2151 set_session_key(hmac_key_encrypted);
2152
2153 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2154 client, nonce1, signature))
2155 return false;
2156 queue_confirm_cmd();
2157 return true;
2158 }
2159 break;
2160
2161 case confirm_cmd:
2162 require(! authenticated, "confirm netcmd received when not authenticated");
2163 require(voice == client_voice, "confirm netcmd received in client voice");
2164 {
2165 string signature;
2166 cmd.read_confirm_cmd();
2167 this->authenticated = true;
2168 respond_to_confirm_cmd();
2169 return true;
2170 }
2171 break;
2172
2173 case refine_cmd:
2174 require(authenticated, "refine netcmd received when authenticated");
2175 {
2176 merkle_node node;
2177 refinement_type ty;
2178 cmd.read_refine_cmd(ty, node);
2179 return process_refine_cmd(ty, node);
2180 }
2181 break;
2182
2183 case done_cmd:
2184 require(authenticated, "done netcmd received when not authenticated");
2185 {
2186 size_t n_items;
2187 netcmd_item_type type;
2188 cmd.read_done_cmd(type, n_items);
2189 return process_done_cmd(type, n_items);
2190 }
2191 break;
2192
2193 case data_cmd:
2194 require(authenticated, "data netcmd received when not authenticated");
2195 require(role == sink_role ||
2196 role == source_and_sink_role,
2197 "data netcmd received in source or source/sink role");
2198 {
2199 netcmd_item_type type;
2200 id item;
2201 string dat;
2202 cmd.read_data_cmd(type, item, dat);
2203 return process_data_cmd(type, item, dat);
2204 }
2205 break;
2206
2207 case delta_cmd:
2208 require(authenticated, "delta netcmd received when not authenticated");
2209 require(role == sink_role ||
2210 role == source_and_sink_role,
2211 "delta netcmd received in source or source/sink role");
2212 {
2213 netcmd_item_type type;
2214 id base, ident;
2215 delta del;
2216 cmd.read_delta_cmd(type, base, ident, del);
2217 return process_delta_cmd(type, base, ident, del);
2218 }
2219 break;
2220
2221 case usher_cmd:
2222 {
2223 utf8 greeting;
2224 cmd.read_usher_cmd(greeting);
2225 return process_usher_cmd(greeting);
2226 }
2227 break;
2228
2229 case usher_reply_cmd:
2230 return false; // Should not happen.
2231 break;
2232 }
2233 return false;
2234}
2235
2236// This kicks off the whole cascade starting from "hello".
2237void
2238session::begin_service()
2239{
2240 keypair kp;
2241 if (app.opts.use_transport_auth)
2242 app.keys.get_key_pair(app.opts.signing_key, kp);
2243 queue_hello_cmd(app.opts.signing_key, kp.pub, mk_nonce());
2244}
2245
2246void
2247session::maybe_step()
2248{
2249 while (done_all_refinements()
2250 && !rev_enumerator.done()
2251 && outbuf_size < constants::bufsz * 10)
2252 {
2253 rev_enumerator.step();
2254 }
2255}
2256
2257void
2258session::maybe_say_goodbye(transaction_guard & guard)
2259{
2260 if (voice == client_voice
2261 && protocol_state == working_state
2262 && finished_working())
2263 {
2264 protocol_state = shutdown_state;
2265 guard.do_checkpoint();
2266 queue_bye_cmd(0);
2267 }
2268}
2269
2270bool
2271session::arm()
2272{
2273 if (!armed)
2274 {
2275 // Don't pack the buffer unnecessarily.
2276 if (outbuf_size > constants::bufsz * 10)
2277 return false;
2278
2279 if (cmd.read(inbuf, read_hmac))
2280 {
2281 armed = true;
2282 }
2283 }
2284 return armed;
2285}
2286
2287bool session::process(transaction_guard & guard)
2288{
2289 if (encountered_error)
2290 return true;
2291 try
2292 {
2293 if (!arm())
2294 return true;
2295
2296 armed = false;
2297 L(FL("processing %d byte input buffer from peer %s")
2298 % inbuf.size() % peer_id);
2299
2300 size_t sz = cmd.encoded_size();
2301 bool ret = dispatch_payload(cmd, guard);
2302
2303 if (inbuf.size() >= constants::netcmd_maxsz)
2304 W(F("input buffer for peer %s is overfull "
2305 "after netcmd dispatch") % peer_id);
2306
2307 guard.maybe_checkpoint(sz);
2308
2309 if (!ret)
2310 L(FL("finishing processing with '%d' packet")
2311 % cmd.get_cmd_code());
2312 return ret;
2313 }
2314 catch (bad_decode & bd)
2315 {
2316 W(F("protocol error while processing peer %s: '%s'")
2317 % peer_id % bd.what);
2318 return false;
2319 }
2320 catch (netsync_error & err)
2321 {
2322 W(F("error: %s") % err.msg);
2323 queue_error_cmd(boost::lexical_cast<string>(error_code) + " " + err.msg);
2324 encountered_error = true;
2325 return true; // Don't terminate until we've send the error_cmd.
2326 }
2327}
2328
2329
2330static shared_ptr<Netxx::StreamBase>
2331build_stream_to_server(app_state & app,
2332 globish const & include_pattern,
2333 globish const & exclude_pattern,
2334 utf8 const & address,
2335 Netxx::port_type default_port,
2336 Netxx::Timeout timeout)
2337{
2338 shared_ptr<Netxx::StreamBase> server;
2339 uri u;
2340 vector<string> argv;
2341
2342 parse_uri(address(), u);
2343 if (app.lua.hook_get_netsync_connect_command(u,
2344 include_pattern,
2345 exclude_pattern,
2346 global_sanity.debug_p(),
2347 argv))
2348 {
2349 I(argv.size() > 0);
2350 string cmd = argv[0];
2351 argv.erase(argv.begin());
2352 app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u);
2353 return shared_ptr<Netxx::StreamBase>
2354 (new Netxx::PipeStream(cmd, argv));
2355
2356 }
2357 else
2358 {
2359#ifdef USE_IPV6
2360 bool use_ipv6=true;
2361#else
2362 bool use_ipv6=false;
2363#endif
2364 Netxx::Address addr(address().c_str(),
2365 default_port, use_ipv6);
2366 return shared_ptr<Netxx::StreamBase>
2367 (new Netxx::Stream(addr, timeout));
2368 }
2369}
2370
2371static void
2372call_server(protocol_role role,
2373 globish const & include_pattern,
2374 globish const & exclude_pattern,
2375 app_state & app,
2376 std::list<utf8> const & addresses,
2377 Netxx::port_type default_port,
2378 unsigned long timeout_seconds)
2379{
2380 Netxx::PipeCompatibleProbe probe;
2381 transaction_guard guard(app.db);
2382 I(addresses.size() == 1);
2383 utf8 address(*addresses.begin());
2384
2385 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2386
2387 P(F("connecting to %s") % address);
2388
2389 shared_ptr<Netxx::StreamBase> server
2390 = build_stream_to_server(app,
2391 include_pattern,
2392 exclude_pattern,
2393 address, default_port,
2394 timeout);
2395
2396
2397 // 'false' here means not to revert changes when the SockOpt
2398 // goes out of scope.
2399 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2400 socket_options.set_non_blocking();
2401
2402 session sess(role, client_voice,
2403 include_pattern,
2404 exclude_pattern,
2405 app, address(), server);
2406
2407 while (true)
2408 {
2409 bool armed = false;
2410 try
2411 {
2412 armed = sess.arm();
2413 }
2414 catch (bad_decode & bd)
2415 {
2416 E(false, F("protocol error while processing peer %s: '%s'")
2417 % sess.peer_id % bd.what);
2418 }
2419
2420 sess.maybe_step();
2421 sess.maybe_say_goodbye(guard);
2422
2423 probe.clear();
2424 probe.add(*(sess.str), sess.which_events());
2425 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
2426 Netxx::Probe::ready_type event = res.second;
2427 Netxx::socket_type fd = res.first;
2428
2429 if (fd == -1 && !armed)
2430 {
2431 E(false, (F("timed out waiting for I/O with "
2432 "peer %s, disconnecting")
2433 % sess.peer_id));
2434 }
2435
2436 bool all_io_clean = (event != Netxx::Probe::ready_oobd);
2437
2438 if (event & Netxx::Probe::ready_read)
2439 all_io_clean = all_io_clean && sess.read_some();
2440
2441 if (event & Netxx::Probe::ready_write)
2442 all_io_clean = all_io_clean && sess.write_some();
2443
2444 if (armed)
2445 if (!sess.process(guard))
2446 {
2447 // Commit whatever work we managed to accomplish anyways.
2448 guard.commit();
2449
2450 // We failed during processing. This should only happen in
2451 // client voice when we have a decode exception, or received an
2452 // error from our server (which is translated to a decode
2453 // exception). We call these cases E() errors.
2454 E(false, F("processing failure while talking to "
2455 "peer %s, disconnecting")
2456 % sess.peer_id);
2457 return;
2458 }
2459
2460 if (!all_io_clean)
2461 {
2462 // Commit whatever work we managed to accomplish anyways.
2463 guard.commit();
2464
2465 // We had an I/O error. We must decide if this represents a
2466 // user-reported error or a clean disconnect. See protocol
2467 // state diagram in session::process_bye_cmd.
2468
2469 if (sess.protocol_state == session::confirmed_state)
2470 {
2471 P(F("successful exchange with %s")
2472 % sess.peer_id);
2473 return;
2474 }
2475 else if (sess.encountered_error)
2476 {
2477 P(F("peer %s disconnected after we informed them of error")
2478 % sess.peer_id);
2479 return;
2480 }
2481 else
2482 E(false, (F("I/O failure while talking to "
2483 "peer %s, disconnecting")
2484 % sess.peer_id));
2485 }
2486 }
2487}
2488
2489static void
2490drop_session_associated_with_fd(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2491 Netxx::socket_type fd)
2492{
2493 // This is a bit of a hack. Initially all "file descriptors" in
2494 // netsync were full duplex, so we could get away with indexing
2495 // sessions by their file descriptor.
2496 //
2497 // When using pipes in unix, it's no longer true: a session gets
2498 // entered in the session map under its read pipe fd *and* its write
2499 // pipe fd. When we're in such a situation the socket fd is "-1" and
2500 // we downcast to a PipeStream and use its read+write fds.
2501 //
2502 // When using pipes in windows, we use a full duplex pipe (named
2503 // pipe) so the socket-like abstraction holds.
2504
2505 I(fd != -1);
2506 map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.find(fd);
2507 I(i != sessions.end());
2508 shared_ptr<session> sess = i->second;
2509 fd = sess->str->get_socketfd();
2510 if (fd != -1)
2511 {
2512 sessions.erase(fd);
2513 }
2514 else
2515 {
2516 shared_ptr<Netxx::PipeStream> pipe =
2517 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2518 I(static_cast<bool>(pipe));
2519 I(pipe->get_writefd() != -1);
2520 I(pipe->get_readfd() != -1);
2521 sessions.erase(pipe->get_readfd());
2522 sessions.erase(pipe->get_writefd());
2523 }
2524}
2525
2526static void
2527arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
2528 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2529 set<Netxx::socket_type> & armed_sessions,
2530 transaction_guard & guard)
2531{
2532 set<Netxx::socket_type> arm_failed;
2533 for (map<Netxx::socket_type,
2534 shared_ptr<session> >::const_iterator i = sessions.begin();
2535 i != sessions.end(); ++i)
2536 {
2537 i->second->maybe_step();
2538 i->second->maybe_say_goodbye(guard);
2539 try
2540 {
2541 if (i->second->arm())
2542 {
2543 L(FL("fd %d is armed") % i->first);
2544 armed_sessions.insert(i->first);
2545 }
2546 probe.add(*i->second->str, i->second->which_events());
2547 }
2548 catch (bad_decode & bd)
2549 {
2550 W(F("protocol error while processing peer %s: '%s', marking as bad")
2551 % i->second->peer_id % bd.what);
2552 arm_failed.insert(i->first);
2553 }
2554 }
2555 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
2556 i != arm_failed.end(); ++i)
2557 {
2558 drop_session_associated_with_fd(sessions, *i);
2559 }
2560}
2561
2562static void
2563handle_new_connection(Netxx::Address & addr,
2564 Netxx::StreamServer & server,
2565 Netxx::Timeout & timeout,
2566 protocol_role role,
2567 globish const & include_pattern,
2568 globish const & exclude_pattern,
2569 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2570 app_state & app)
2571{
2572 L(FL("accepting new connection on %s : %s")
2573 % (addr.get_name()?addr.get_name():"") % lexical_cast<string>(addr.get_port()));
2574 Netxx::Peer client = server.accept_connection();
2575
2576 if (!client)
2577 {
2578 L(FL("accept() returned a dead client"));
2579 }
2580 else
2581 {
2582 P(F("accepted new client connection from %s : %s")
2583 % client.get_address() % lexical_cast<string>(client.get_port()));
2584
2585 // 'false' here means not to revert changes when the SockOpt
2586 // goes out of scope.
2587 Netxx::SockOpt socket_options(client.get_socketfd(), false);
2588 socket_options.set_non_blocking();
2589
2590 shared_ptr<Netxx::Stream> str =
2591 shared_ptr<Netxx::Stream>
2592 (new Netxx::Stream(client.get_socketfd(), timeout));
2593
2594 shared_ptr<session> sess(new session(role, server_voice,
2595 include_pattern, exclude_pattern,
2596 app,
2597 lexical_cast<string>(client), str));
2598 sess->begin_service();
2599 sessions.insert(make_pair(client.get_socketfd(), sess));
2600 }
2601}
2602
2603static void
2604handle_read_available(Netxx::socket_type fd,
2605 shared_ptr<session> sess,
2606 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2607 set<Netxx::socket_type> & armed_sessions,
2608 bool & live_p)
2609{
2610 if (sess->read_some())
2611 {
2612 try
2613 {
2614 if (sess->arm())
2615 armed_sessions.insert(fd);
2616 }
2617 catch (bad_decode & bd)
2618 {
2619 W(F("protocol error while processing peer %s: '%s', disconnecting")
2620 % sess->peer_id % bd.what);
2621 drop_session_associated_with_fd(sessions, fd);
2622 live_p = false;
2623 }
2624 }
2625 else
2626 {
2627 switch (sess->protocol_state)
2628 {
2629 case session::working_state:
2630 P(F("peer %s read failed in working state (error)")
2631 % sess->peer_id);
2632 break;
2633
2634 case session::shutdown_state:
2635 P(F("peer %s read failed in shutdown state "
2636 "(possibly client misreported error)")
2637 % sess->peer_id);
2638 break;
2639
2640 case session::confirmed_state:
2641 P(F("peer %s read failed in confirmed state (success)")
2642 % sess->peer_id);
2643 break;
2644 }
2645 drop_session_associated_with_fd(sessions, fd);
2646 live_p = false;
2647 }
2648}
2649
2650
2651static void
2652handle_write_available(Netxx::socket_type fd,
2653 shared_ptr<session> sess,
2654 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2655 bool & live_p)
2656{
2657 if (!sess->write_some())
2658 {
2659 switch (sess->protocol_state)
2660 {
2661 case session::working_state:
2662 P(F("peer %s write failed in working state (error)")
2663 % sess->peer_id);
2664 break;
2665
2666 case session::shutdown_state:
2667 P(F("peer %s write failed in shutdown state "
2668 "(possibly client misreported error)")
2669 % sess->peer_id);
2670 break;
2671
2672 case session::confirmed_state:
2673 P(F("peer %s write failed in confirmed state (success)")
2674 % sess->peer_id);
2675 break;
2676 }
2677
2678 drop_session_associated_with_fd(sessions, fd);
2679 live_p = false;
2680 }
2681}
2682
2683static void
2684process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2685 set<Netxx::socket_type> & armed_sessions,
2686 transaction_guard & guard)
2687{
2688 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
2689 i != armed_sessions.end(); ++i)
2690 {
2691 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
2692 j = sessions.find(*i);
2693 if (j == sessions.end())
2694 continue;
2695 else
2696 {
2697 shared_ptr<session> sess = j->second;
2698 if (!sess->process(guard))
2699 {
2700 P(F("peer %s processing finished, disconnecting")
2701 % sess->peer_id);
2702 drop_session_associated_with_fd(sessions, *i);
2703 }
2704 }
2705 }
2706}
2707
2708static void
2709reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2710 unsigned long timeout_seconds)
2711{
2712 // Kill any clients which haven't done any i/o inside the timeout period
2713 // or who have exchanged all items and flushed their output buffers.
2714 set<Netxx::socket_type> dead_clients;
2715 time_t now = ::time(NULL);
2716 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator
2717 i = sessions.begin(); i != sessions.end(); ++i)
2718 {
2719 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
2720 < static_cast<unsigned long>(now))
2721 {
2722 P(F("fd %d (peer %s) has been idle too long, disconnecting")
2723 % i->first % i->second->peer_id);
2724 dead_clients.insert(i->first);
2725 }
2726 }
2727 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
2728 i != dead_clients.end(); ++i)
2729 {
2730 drop_session_associated_with_fd(sessions, *i);
2731 }
2732}
2733
2734static void
2735serve_connections(protocol_role role,
2736 globish const & include_pattern,
2737 globish const & exclude_pattern,
2738 app_state & app,
2739 std::list<utf8> const & addresses,
2740 Netxx::port_type default_port,
2741 unsigned long timeout_seconds,
2742 unsigned long session_limit)
2743{
2744 Netxx::PipeCompatibleProbe probe;
2745
2746 Netxx::Timeout
2747 forever,
2748 timeout(static_cast<long>(timeout_seconds)),
2749 instant(0,1);
2750
2751#ifdef USE_IPV6
2752 bool use_ipv6=true;
2753#else
2754 bool use_ipv6=false;
2755#endif
2756 // This will be true when we try to bind while using IPv6. See comments
2757 // further down.
2758 bool try_again=false;
2759
2760 do
2761 {
2762 try
2763 {
2764 try_again = false;
2765
2766 Netxx::Address addr(use_ipv6);
2767
2768 if (addresses.empty())
2769 addr.add_all_addresses(default_port);
2770 else
2771 {
2772 for (std::list<utf8>::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
2773 {
2774 const utf8 & address = *it;
2775 if (!address().empty())
2776 {
2777 size_t l_colon = address().find(':');
2778 size_t r_colon = address().rfind(':');
2779
2780 if (l_colon == r_colon && l_colon == 0)
2781 {
2782 // can't be an IPv6 address as there is only one colon
2783 // must be a : followed by a port
2784 string port_str = address().substr(1);
2785 addr.add_all_addresses(std::atoi(port_str.c_str()));
2786 }
2787 else
2788 addr.add_address(address().c_str(), default_port);
2789 }
2790 }
2791 }
2792
2793 // If se use IPv6 and the initialisation of server fails, we want
2794 // to try again with IPv4. The reason is that someone may have
2795 // downloaded a IPv6-enabled monotone on a system that doesn't
2796 // have IPv6, and which might fail therefore.
2797 // On failure, Netxx::NetworkException is thrown, and we catch
2798 // it further down.
2799 try_again=use_ipv6;
2800
2801 Netxx::StreamServer server(addr, timeout);
2802
2803 // If we came this far, whatever we used (IPv6 or IPv4) was
2804 // accepted, so we don't need to try again any more.
2805 try_again=false;
2806
2807 const char *name = addr.get_name();
2808 P(F("beginning service on %s : %s")
2809 % (name != NULL ? name : _("<all interfaces>"))
2810 % lexical_cast<string>(addr.get_port()));
2811
2812 map<Netxx::socket_type, shared_ptr<session> > sessions;
2813 set<Netxx::socket_type> armed_sessions;
2814
2815 shared_ptr<transaction_guard> guard;
2816
2817 while (true)
2818 {
2819 probe.clear();
2820 armed_sessions.clear();
2821
2822 if (sessions.size() >= session_limit)
2823 W(F("session limit %d reached, some connections "
2824 "will be refused") % session_limit);
2825 else
2826 probe.add(server);
2827
2828 if (!guard)
2829 guard = shared_ptr<transaction_guard>(new transaction_guard(app.db));
2830
2831 I(guard);
2832
2833 while (!server_initiated_sync_requests.empty())
2834 {
2835 server_initiated_sync_request request
2836 = server_initiated_sync_requests.front();
2837 server_initiated_sync_requests.pop_front();
2838
2839 utf8 addr(request.address);
2840 globish inc(request.include);
2841 globish exc(request.exclude);
2842
2843 try
2844 {
2845 P(F("connecting to %s") % addr());
2846 shared_ptr<Netxx::StreamBase> server
2847 = build_stream_to_server(app, inc, exc,
2848 addr, default_port,
2849 timeout);
2850
2851 // 'false' here means not to revert changes when
2852 // the SockOpt goes out of scope.
2853 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2854 socket_options.set_non_blocking();
2855
2856 protocol_role role = source_and_sink_role;
2857 if (request.what == "sync")
2858 role = source_and_sink_role;
2859 else if (request.what == "push")
2860 role = source_role;
2861 else if (request.what == "pull")
2862 role = sink_role;
2863
2864 shared_ptr<session> sess(new session(role, client_voice,
2865 inc, exc,
2866 app, addr(), server, true));
2867
2868 sessions.insert(make_pair(server->get_socketfd(), sess));
2869 }
2870 catch (Netxx::NetworkException & e)
2871 {
2872 P(F("Network error: %s") % e.what());
2873 }
2874 }
2875
2876 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, *guard);
2877
2878 L(FL("i/o probe with %d armed") % armed_sessions.size());
2879 Netxx::socket_type fd;
2880 Netxx::Timeout how_long;
2881 if (sessions.empty())
2882 how_long = forever;
2883 else if (armed_sessions.empty())
2884 how_long = timeout;
2885 else
2886 how_long = instant;
2887 do
2888 {
2889 Netxx::Probe::result_type res = probe.ready(how_long);
2890 how_long = instant;
2891 Netxx::Probe::ready_type event = res.second;
2892 fd = res.first;
2893
2894 if (fd == -1)
2895 {
2896 if (armed_sessions.empty())
2897 L(FL("timed out waiting for I/O (listening on %s : %s)")
2898 % addr.get_name() % lexical_cast<string>(addr.get_port()));
2899 }
2900
2901 // we either got a new connection
2902 else if (fd == server)
2903 handle_new_connection(addr, server, timeout, role,
2904 include_pattern, exclude_pattern,
2905 sessions, app);
2906
2907 // or an existing session woke up
2908 else
2909 {
2910 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
2911 i = sessions.find(fd);
2912 if (i == sessions.end())
2913 {
2914 L(FL("got woken up for action on unknown fd %d") % fd);
2915 }
2916 else
2917 {
2918 probe.remove(*(i->second->str));
2919 shared_ptr<session> sess = i->second;
2920 bool live_p = true;
2921
2922 try
2923 {
2924 if (event & Netxx::Probe::ready_read)
2925 handle_read_available(fd, sess, sessions,
2926 armed_sessions, live_p);
2927
2928 if (live_p && (event & Netxx::Probe::ready_write))
2929 handle_write_available(fd, sess, sessions, live_p);
2930 }
2931 catch (Netxx::Exception &)
2932 {
2933 P(F("Network error on peer %s, disconnecting")
2934 % sess->peer_id);
2935 drop_session_associated_with_fd(sessions, fd);
2936 }
2937 if (live_p && (event & Netxx::Probe::ready_oobd))
2938 {
2939 P(F("got OOB from peer %s, disconnecting")
2940 % sess->peer_id);
2941 drop_session_associated_with_fd(sessions, fd);
2942 }
2943 }
2944 }
2945 }
2946 while (fd != -1);
2947 process_armed_sessions(sessions, armed_sessions, *guard);
2948 reap_dead_sessions(sessions, timeout_seconds);
2949
2950 if (sessions.empty())
2951 {
2952 // Let the guard die completely if everything's gone quiet.
2953 guard->commit();
2954 guard.reset();
2955 }
2956 }
2957 }
2958 // This exception is thrown when bind() fails somewhere in Netxx.
2959 catch (Netxx::NetworkException &)
2960 {
2961 // If we tried with IPv6 and failed, we want to try again using IPv4.
2962 if (try_again)
2963 {
2964 use_ipv6 = false;
2965 }
2966 // In all other cases, just rethrow the exception.
2967 else
2968 throw;
2969 }
2970 // This exception is thrown when there is no support for the type of
2971 // connection we want to do in the kernel, for example when a socket()
2972 // call fails somewhere in Netxx.
2973 catch (Netxx::Exception &)
2974 {
2975 // If we tried with IPv6 and failed, we want to try again using IPv4.
2976 if (try_again)
2977 {
2978 use_ipv6 = false;
2979 }
2980 // In all other cases, just rethrow the exception.
2981 else
2982 throw;
2983 }
2984 }
2985 while(try_again);
2986 }
2987
2988static void
2989serve_single_connection(shared_ptr<session> sess,
2990 unsigned long timeout_seconds)
2991{
2992 Netxx::PipeCompatibleProbe probe;
2993
2994 Netxx::Timeout
2995 forever,
2996 timeout(static_cast<long>(timeout_seconds)),
2997 instant(0,1);
2998
2999 P(F("beginning service on %s") % sess->peer_id);
3000
3001 sess->begin_service();
3002
3003 transaction_guard guard(sess->app.db);
3004
3005 map<Netxx::socket_type, shared_ptr<session> > sessions;
3006 set<Netxx::socket_type> armed_sessions;
3007
3008 if (sess->str->get_socketfd() == -1)
3009 {
3010 // Unix pipes are non-duplex, have two filedescriptors
3011 shared_ptr<Netxx::PipeStream> pipe =
3012 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
3013 I(pipe);
3014 sessions[pipe->get_writefd()]=sess;
3015 sessions[pipe->get_readfd()]=sess;
3016 }
3017 else
3018 sessions[sess->str->get_socketfd()]=sess;
3019
3020 while (!sessions.empty())
3021 {
3022 probe.clear();
3023 armed_sessions.clear();
3024
3025 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, guard);
3026
3027 L(FL("i/o probe with %d armed") % armed_sessions.size());
3028 Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout
3029 : instant));
3030 Netxx::Probe::ready_type event = res.second;
3031 Netxx::socket_type fd = res.first;
3032
3033 if (fd == -1)
3034 {
3035 if (armed_sessions.empty())
3036 L(FL("timed out waiting for I/O (listening on %s)")
3037 % sess->peer_id);
3038 }
3039
3040 // an existing session woke up
3041 else
3042 {
3043 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3044 i = sessions.find(fd);
3045 if (i == sessions.end())
3046 {
3047 L(FL("got woken up for action on unknown fd %d") % fd);
3048 }
3049 else
3050 {
3051 shared_ptr<session> sess = i->second;
3052 bool live_p = true;
3053
3054 if (event & Netxx::Probe::ready_read)
3055 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
3056
3057 if (live_p && (event & Netxx::Probe::ready_write))
3058 handle_write_available(fd, sess, sessions, live_p);
3059
3060 if (live_p && (event & Netxx::Probe::ready_oobd))
3061 {
3062 P(F("got some OOB data on fd %d (peer %s), disconnecting")
3063 % fd % sess->peer_id);
3064 drop_session_associated_with_fd(sessions, fd);
3065 }
3066 }
3067 }
3068 process_armed_sessions(sessions, armed_sessions, guard);
3069 reap_dead_sessions(sessions, timeout_seconds);
3070 }
3071}
3072
3073
3074void
3075insert_with_parents(revision_id rev,
3076 refiner & ref,
3077 revision_enumerator & rev_enumerator,
3078 set<revision_id> & revs,
3079 app_state & app,
3080 ticker & revisions_ticker)
3081{
3082 deque<revision_id> work;
3083 work.push_back(rev);
3084 while (!work.empty())
3085 {
3086 revision_id rid = work.front();
3087 work.pop_front();
3088
3089 if (!null_id(rid) && revs.find(rid) == revs.end())
3090 {
3091 revs.insert(rid);
3092 ++revisions_ticker;
3093 id rev_item;
3094 decode_hexenc(rid.inner(), rev_item);
3095 ref.note_local_item(rev_item);
3096 vector<revision_id> parents;
3097 rev_enumerator.get_revision_parents(rid, parents);
3098 for (vector<revision_id>::const_iterator i = parents.begin();
3099 i != parents.end(); ++i)
3100 {
3101 work.push_back(*i);
3102 }
3103 }
3104 }
3105}
3106
3107void
3108session::rebuild_merkle_trees(app_state & app,
3109 set<branch_name> const & branchnames)
3110{
3111 P(F("finding items to synchronize:"));
3112 for (set<branch_name>::const_iterator i = branchnames.begin();
3113 i != branchnames.end(); ++i)
3114 L(FL("including branch %s") % *i);
3115
3116 // xgettext: please use short message and try to avoid multibytes chars
3117 ticker revisions_ticker(N_("revisions"), "r", 64);
3118 // xgettext: please use short message and try to avoid multibytes chars
3119 ticker certs_ticker(N_("certificates"), "c", 256);
3120 // xgettext: please use short message and try to avoid multibytes chars
3121 ticker keys_ticker(N_("keys"), "k", 1);
3122
3123 set<revision_id> revision_ids;
3124 set<rsa_keypair_id> inserted_keys;
3125
3126 {
3127 for (set<branch_name>::const_iterator i = branchnames.begin();
3128 i != branchnames.end(); ++i)
3129 {
3130 // Get branch certs.
3131 vector< revision<cert> > certs;
3132 // FIXME_PROJECTS: probably something like
3133 // app.get_project(i->project).get_branch_certs(i->branch)
3134 // or so.
3135 app.get_project().get_branch_certs(*i, certs);
3136 for (vector< revision<cert> >::const_iterator j = certs.begin();
3137 j != certs.end(); j++)
3138 {
3139 revision_id rid(j->inner().ident);
3140 insert_with_parents(rid, rev_refiner, rev_enumerator,
3141 revision_ids, app, revisions_ticker);
3142 // Branch certs go in here, others later on.
3143 hexenc<id> tmp;
3144 id item;
3145 cert_hash_code(j->inner(), tmp);
3146 decode_hexenc(tmp, item);
3147 cert_refiner.note_local_item(item);
3148 rev_enumerator.note_cert(rid, tmp);
3149 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3150 inserted_keys.insert(j->inner().key);
3151 }
3152 }
3153 }
3154
3155 {
3156 map<branch_name, epoch_data> epochs;
3157 app.db.get_epochs(epochs);
3158
3159 epoch_data epoch_zero(string(constants::epochlen, '0'));
3160 for (set<branch_name>::const_iterator i = branchnames.begin();
3161 i != branchnames.end(); ++i)
3162 {
3163 branch_name const & branch(*i);
3164 map<branch_name, epoch_data>::const_iterator j;
3165 j = epochs.find(branch);
3166
3167 // Set to zero any epoch which is not yet set.
3168 if (j == epochs.end())
3169 {
3170 L(FL("setting epoch on %s to zero") % branch);
3171 epochs.insert(make_pair(branch, epoch_zero));
3172 app.db.set_epoch(branch, epoch_zero);
3173 }
3174
3175 // Then insert all epochs into merkle tree.
3176 j = epochs.find(branch);
3177 I(j != epochs.end());
3178 epoch_id eid;
3179 id epoch_item;
3180 epoch_hash_code(j->first, j->second, eid);
3181 decode_hexenc(eid.inner(), epoch_item);
3182 epoch_refiner.note_local_item(epoch_item);
3183 }
3184 }
3185
3186 {
3187 typedef vector< pair<hexenc<id>,
3188 pair<revision_id, rsa_keypair_id> > > cert_idx;
3189
3190 cert_idx idx;
3191 app.db.get_revision_cert_nobranch_index(idx);
3192
3193 // Insert all non-branch certs reachable via these revisions
3194 // (branch certs were inserted earlier).
3195
3196 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3197 {
3198 hexenc<id> const & hash = i->first;
3199 revision_id const & ident = i->second.first;
3200 rsa_keypair_id const & key = i->second.second;
3201
3202 rev_enumerator.note_cert(ident, hash);
3203
3204 if (revision_ids.find(ident) == revision_ids.end())
3205 continue;
3206
3207 id item;
3208 decode_hexenc(hash, item);
3209 cert_refiner.note_local_item(item);
3210 ++certs_ticker;
3211 if (inserted_keys.find(key) == inserted_keys.end())
3212 inserted_keys.insert(key);
3213 }
3214 }
3215
3216 // Add any keys specified on the command line.
3217 for (vector<rsa_keypair_id>::const_iterator key
3218 = app.opts.keys_to_push.begin();
3219 key != app.opts.keys_to_push.end(); ++key)
3220 {
3221 if (inserted_keys.find(*key) == inserted_keys.end())
3222 {
3223 if (!app.db.public_key_exists(*key))
3224 {
3225 if (app.keys.key_pair_exists(*key))
3226 app.keys.ensure_in_database(*key);
3227 else
3228 W(F("Cannot find key '%s'") % *key);
3229 }
3230 inserted_keys.insert(*key);
3231 }
3232 }
3233
3234 // Insert all the keys.
3235 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3236 key != inserted_keys.end(); key++)
3237 {
3238 if (app.db.public_key_exists(*key))
3239 {
3240 base64<rsa_pub_key> pub_encoded;
3241 app.db.get_key(*key, pub_encoded);
3242 hexenc<id> keyhash;
3243 key_hash_code(*key, pub_encoded, keyhash);
3244 L(FL("noting key '%s' = '%s' to send") % *key % keyhash);
3245 id key_item;
3246 decode_hexenc(keyhash, key_item);
3247 key_refiner.note_local_item(key_item);
3248 ++keys_ticker;
3249 }
3250 }
3251
3252 rev_refiner.reindex_local_items();
3253 cert_refiner.reindex_local_items();
3254 key_refiner.reindex_local_items();
3255 epoch_refiner.reindex_local_items();
3256}
3257
3258void
3259run_netsync_protocol(protocol_voice voice,
3260 protocol_role role,
3261 std::list<utf8> const & addrs,
3262 globish const & include_pattern,
3263 globish const & exclude_pattern,
3264 app_state & app)
3265{
3266 if (include_pattern().find_first_of("'\"") != string::npos)
3267 {
3268 W(F("include branch pattern contains a quote character:\n"
3269 "%s") % include_pattern());
3270 }
3271
3272 if (exclude_pattern().find_first_of("'\"") != string::npos)
3273 {
3274 W(F("exclude branch pattern contains a quote character:\n"
3275 "%s") % exclude_pattern());
3276 }
3277
3278 // We do not want to be killed by SIGPIPE from a network disconnect.
3279 ignore_sigpipe();
3280
3281 try
3282 {
3283 if (voice == server_voice)
3284 {
3285 if (app.opts.bind_stdio)
3286 {
3287 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
3288 shared_ptr<session> sess(new session(role, server_voice,
3289 include_pattern, exclude_pattern,
3290 app, "stdio", str));
3291 serve_single_connection(sess,constants::netsync_timeout_seconds);
3292 }
3293 else
3294 serve_connections(role, include_pattern, exclude_pattern, app,
3295 addrs, static_cast<Netxx::port_type>(constants::netsync_default_port),
3296 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3297 static_cast<unsigned long>(constants::netsync_connection_limit));
3298 }
3299 else
3300 {
3301 I(voice == client_voice);
3302 call_server(role, include_pattern, exclude_pattern, app,
3303 addrs, static_cast<Netxx::port_type>(constants::netsync_default_port),
3304 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3305 }
3306 }
3307 catch (Netxx::NetworkException & e)
3308 {
3309 throw informative_failure((F("network error: %s") % e.what()).str());
3310 }
3311 catch (Netxx::Exception & e)
3312 {
3313 throw oops((F("network error: %s") % e.what()).str());;
3314 }
3315}
3316
3317// Local Variables:
3318// mode: C++
3319// fill-column: 76
3320// c-file-style: "gnu"
3321// indent-tabs-mode: nil
3322// End:
3323// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status