monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <map>
12#include <cstdlib>
13#include <memory>
14#include <list>
15#include <deque>
16#include <stack>
17
18#include <time.h>
19
20#include "lexical_cast.hh"
21#include <boost/scoped_ptr.hpp>
22#include <boost/shared_ptr.hpp>
23#include <boost/bind.hpp>
24
25#include "app_state.hh"
26#include "cert.hh"
27#include "constants.hh"
28#include "enumerator.hh"
29#include "keys.hh"
30#include "lua.hh"
31#include "merkle_tree.hh"
32#include "netcmd.hh"
33#include "netio.hh"
34#include "numeric_vocab.hh"
35#include "refiner.hh"
36#include "revision.hh"
37#include "sanity.hh"
38#include "transforms.hh"
39#include "ui.hh"
40#include "xdelta.hh"
41#include "epoch.hh"
42#include "platform.hh"
43#include "hmac.hh"
44#include "globish.hh"
45#include "uri.hh"
46
47#include "botan/botan.h"
48
49#include "netxx/address.h"
50#include "netxx/peer.h"
51#include "netxx/probe.h"
52#include "netxx/socket.h"
53#include "netxx/sockopt.h"
54#include "netxx/stream.h"
55#include "netxx/streamserver.h"
56#include "netxx/timeout.h"
57#include "netxx_pipe.hh"
58// TODO: things to do that will break protocol compatibility
59// -- need some way to upgrade anonymous to keyed pull, without user having
60// to explicitly specify which they want
61// just having a way to respond "access denied, try again" might work
62// but perhaps better to have the anonymous command include a note "I
63// _could_ use key <...> if you prefer", and if that would lead to more
64// access, could reply "I do prefer". (Does this lead to too much
65// information exposure? Allows anonymous people to probe what branches
66// a key has access to.)
67// -- "warning" packet type?
68// -- Richard Levitte wants, when you (e.g.) request '*' but don't have
69// access to all of it, you just get the parts you have access to
70// (maybe with warnings about skipped branches). to do this right,
71// should have a way for the server to send back to the client "right,
72// you're not getting the following branches: ...", so the client will
73// not include them in its merkle trie.
74// -- add some sort of vhost field to the client's first packet, saying who
75// they expect to talk to
76
77//
78// This is the "new" network synchronization (netsync) system in
79// monotone. It is based on synchronizing pairs of merkle trees over an
80// interactive connection.
81//
82// A netsync process between peers treats each peer as either a source, a
83// sink, or both. When a peer is only a source, it will not write any new
84// items to its database. when a peer is only a sink, it will not send any
85// items from its database. When a peer is both a source and sink, it may
86// send and write items freely.
87//
88// The post-state of a netsync is that each sink contains a superset of the
89// items in its corresponding source; when peers are behaving as both
90// source and sink, this means that the post-state of the sync is for the
91// peers to have identical item sets.
92//
93//
94// Data structure
95// --------------
96//
97// Each node in a merkle tree contains a fixed number of slots. this number
98// is derived from a global parameter of the protocol -- the tree fanout --
99// such that the number of slots is 2^fanout. For now we will assume that
100// fanout is 4 thus there are 16 slots in a node, because this makes
101// illustration easier. The other parameter of the protocol is the size of
102// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
103//
104// Each slot in a merkle tree node is in one of 3 states:
105//
106// - empty
107// - leaf
108// - subtree
109//
110// In addition, each leaf contains a hash code which identifies an element
111// of the set being synchronized. Each subtree slot contains a hash code of
112// the node immediately beneath it in the merkle tree. Empty slots contain
113// no hash codes.
114//
115// Since empty slots have no hash code, they are represented implicitly by
116// a bitmap at the head of each merkle tree node. As an additional
117// integrity check, each merkle tree node contains a label indicating its
118// prefix in the tree, and a hash of its own contents.
119//
120// In total, then, the byte-level representation of a <160,4> merkle tree
121// node is as follows:
122//
123// 20 bytes - hash of the remaining bytes in the node
124// 1 byte - type of this node (manifest, file, key, mcert, fcert)
125// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
126// 0-20 bytes - the prefix of this node, 4 bits * level,
127// rounded up to a byte
128// 1-N bytes - number of leaves under this node (uleb128)
129// 4 bytes - slot-state bitmap of the node
130// 0-320 bytes - between 0 and 16 live slots in the node
131//
132// So, in the worst case such a node is 367 bytes, with these parameters.
133//
134//
135// Protocol
136// --------
137//
138// The protocol is a binary command-packet system over TCP; each packet
139// consists of a single byte which identifies the protocol version, a byte
140// which identifies the command name inside that version, a size_t sent as
141// a uleb128 indicating the length of the packet, that many bytes of
142// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
143// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
144// 20-byte random key chosen by the client after authentication (discussed
145// below). Decoding involves simply buffering until a sufficient number of
146// bytes are received, then advancing the buffer pointer. Any time an
147// integrity check (the HMAC) fails, the protocol is assumed to have lost
148// synchronization, and the connection is dropped. The parties are free to
149// drop the TCP stream at any point, if too much data is received or too
150// much idle time passes; no commitments or transactions are made.
151//
152//
153// Authentication and setup
154// ------------------------
155//
156// The exchange begins in a non-authenticated state. The server sends a
157// "hello <id> <nonce>" command, which identifies the server's RSA key and
158// issues a nonce which must be used for a subsequent authentication.
159//
160// The client then responds with either:
161//
162// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
163// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
164// role it wishes to play in the synchronization, identifies the pattern it
165// wishes to sync with, signs the previous nonce with its own key, and informs
166// the server of the HMAC key it wishes to use for this session (encrypted
167// with the server's public key); or
168//
169// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
170// <hmac key>" command, which identifies the role it wishes to play in the
171// synchronization, the pattern it wishes to sync with, and the HMAC key it
172// wishes to use for this session (also encrypted with the server's public
173// key).
174//
175// The server then replies with a "confirm" command, which contains no
176// other data but will only have the correct HMAC integrity code if the
177// server received and properly decrypted the HMAC key offered by the
178// client. This transitions the peers into an authenticated state and
179// begins epoch refinement. If epoch refinement and epoch transmission
180// succeed, the peers switch to data refinement and data transmission.
181//
182//
183// Refinement
184// ----------
185//
186// Refinement is executed by "refiners"; there is a refiner for each
187// set of 'items' being exchanged: epochs, keys, certs, and revisions.
188// When refinement starts, each party knows only their own set of
189// items; when refinement completes, each party has learned of the
190// complete set of items it needs to send, and a count of items it's
191// expecting to receive.
192//
193// For more details on the refinement process, see refiner.cc.
194//
195//
196// Transmission
197// ------------
198//
199// Once the set of items to send has been determined (for keys, certs, and
200// revisions) each peer switches into a transmission mode. This mode
201// involves walking the revision graph in ancestry-order and sending all
202// the items the local peer has which the remote one does not. Since the
203// remote and local peers both know all the items which need to be
204// transferred (they learned during refinement) they know what to wait for
205// and what to send. The mechanisms of the transmission phase (notably,
206// enumerator.cc) simply ensure that things are sent in the proper order,
207// and without over-filling the output buffer too much.
208//
209//
210// Shutdown
211// --------
212//
213// After transmission completes, one special command, "bye", is used to
214// shut down a connection gracefully. The shutdown sequence based on "bye"
215// commands is documented below in session::process_bye_cmd.
216//
217//
218// Note on epochs
219// --------------
220//
221// One refinement and transmission phase preceeds all the others: epochs.
222// Epochs are exchanged and compared in order to be sure that further
223// refinement and transmission (on certs and revisions) makes sense; they
224// are a sort of "immune system" to prevent incompatible databases (say
225// between rebuilds due to bugs in monotone) from cross-contaminating. The
226// later refinements are only kicked off *after* all epochs are received
227// and compare correctly.
228//
229//
230// Note on dense coding
231// --------------------
232//
233// This protocol is "raw binary" (non-text) because coding density is
234// actually important here, and each packet consists of very
235// information-dense material that you wouldn't have a hope of typing in,
236// or interpreting manually anyways.
237//
238
239using std::auto_ptr;
240using std::deque;
241using std::make_pair;
242using std::map;
243using std::min;
244using std::pair;
245using std::set;
246using std::string;
247using std::vector;
248
249using boost::shared_ptr;
250using boost::lexical_cast;
251
252struct server_initiated_sync_request
253{
254 string what;
255 string address;
256 string include;
257 string exclude;
258};
259deque<server_initiated_sync_request> server_initiated_sync_requests;
260LUAEXT(server_request_sync, )
261{
262 char const * w = luaL_checkstring(L, 1);
263 char const * a = luaL_checkstring(L, 2);
264 char const * i = luaL_checkstring(L, 3);
265 char const * e = luaL_checkstring(L, 4);
266 server_initiated_sync_request request;
267 request.what = string(w);
268 request.address = string(a);
269 request.include = string(i);
270 request.exclude = string(e);
271 server_initiated_sync_requests.push_back(request);
272 return 0;
273}
274
275static inline void
276require(bool check, string const & context)
277{
278 if (!check)
279 throw bad_decode(F("check of '%s' failed") % context);
280}
281
282struct netsync_error
283{
284 string msg;
285 netsync_error(string const & s): msg(s) {}
286};
287
288struct
289session:
290 public refiner_callbacks,
291 public enumerator_callbacks
292{
293 protocol_role role;
294 protocol_voice const voice;
295 globish our_include_pattern;
296 globish our_exclude_pattern;
297 globish_matcher our_matcher;
298 app_state & app;
299
300 string peer_id;
301 shared_ptr<Netxx::StreamBase> str;
302
303 string_queue inbuf;
304 // deque of pair<string data, size_t cur_pos>
305 deque< pair<string,size_t> > outbuf;
306 // the total data stored in outbuf - this is
307 // used as a valve to stop too much data
308 // backing up
309 size_t outbuf_size;
310
311 netcmd cmd;
312 bool armed;
313 bool arm();
314
315 id remote_peer_key_hash;
316 rsa_keypair_id remote_peer_key_name;
317 netsync_session_key session_key;
318 chained_hmac read_hmac;
319 chained_hmac write_hmac;
320 bool authenticated;
321
322 time_t last_io_time;
323 auto_ptr<ticker> byte_in_ticker;
324 auto_ptr<ticker> byte_out_ticker;
325 auto_ptr<ticker> cert_in_ticker;
326 auto_ptr<ticker> cert_out_ticker;
327 auto_ptr<ticker> revision_in_ticker;
328 auto_ptr<ticker> revision_out_ticker;
329 size_t bytes_in, bytes_out;
330 size_t certs_in, certs_out;
331 size_t revs_in, revs_out;
332 size_t keys_in, keys_out;
333 // used to identify this session to the netsync hooks.
334 // We can't just use saved_nonce, because that's blank for all
335 // anonymous connections and could lead to confusion.
336 size_t session_id;
337 static size_t session_count;
338
339 vector<revision_id> written_revisions;
340 vector<rsa_keypair_id> written_keys;
341 vector<cert> written_certs;
342
343 id saved_nonce;
344
345 enum
346 {
347 working_state,
348 shutdown_state,
349 confirmed_state
350 }
351 protocol_state;
352
353 bool encountered_error;
354
355 static const int no_error = 200;
356 static const int partial_transfer = 211;
357 static const int no_transfer = 212;
358
359 static const int not_permitted = 412;
360 static const int unknown_key = 422;
361 static const int mixing_versions = 432;
362
363 static const int role_mismatch = 512;
364 static const int bad_command = 521;
365
366 static const int failed_identification = 532;
367 //static const int bad_data = 541;
368
369 int error_code;
370
371 bool set_totals;
372
373 // Interface to refinement.
374 refiner epoch_refiner;
375 refiner key_refiner;
376 refiner cert_refiner;
377 refiner rev_refiner;
378
379 // Interface to ancestry grovelling.
380 revision_enumerator rev_enumerator;
381
382 // Enumerator_callbacks methods.
383 set<file_id> file_items_sent;
384 bool process_this_rev(revision_id const & rev);
385 bool queue_this_cert(hexenc<id> const & c);
386 bool queue_this_file(hexenc<id> const & f);
387 void note_file_data(file_id const & f);
388 void note_file_delta(file_id const & src, file_id const & dst);
389 void note_rev(revision_id const & rev);
390 void note_cert(hexenc<id> const & c);
391
392 session(protocol_role role,
393 protocol_voice voice,
394 globish const & our_include_pattern,
395 globish const & our_exclude_pattern,
396 app_state & app,
397 string const & peer,
398 shared_ptr<Netxx::StreamBase> sock,
399 bool initiated_by_server = false);
400
401 virtual ~session();
402
403 id mk_nonce();
404 void mark_recent_io();
405
406 void set_session_key(string const & key);
407 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
408
409 void setup_client_tickers();
410 bool done_all_refinements();
411 bool queued_all_items();
412 bool received_all_items();
413 bool finished_working();
414 void maybe_step();
415 void maybe_say_goodbye(transaction_guard & guard);
416
417 void note_item_arrived(netcmd_item_type ty, id const & i);
418 void maybe_note_epochs_finished();
419 void note_item_sent(netcmd_item_type ty, id const & i);
420
421 Netxx::Probe::ready_type which_events() const;
422 bool read_some();
423 bool write_some();
424
425 void error(int errcode, string const & errmsg);
426
427 void write_netcmd_and_try_flush(netcmd const & cmd);
428
429 // Outgoing queue-writers.
430 void queue_bye_cmd(u8 phase);
431 void queue_error_cmd(string const & errmsg);
432 void queue_done_cmd(netcmd_item_type type, size_t n_items);
433 void queue_hello_cmd(rsa_keypair_id const & key_name,
434 base64<rsa_pub_key> const & pub_encoded,
435 id const & nonce);
436 void queue_anonymous_cmd(protocol_role role,
437 globish const & include_pattern,
438 globish const & exclude_pattern,
439 id const & nonce2,
440 base64<rsa_pub_key> server_key_encoded);
441 void queue_auth_cmd(protocol_role role,
442 globish const & include_pattern,
443 globish const & exclude_pattern,
444 id const & client,
445 id const & nonce1,
446 id const & nonce2,
447 string const & signature,
448 base64<rsa_pub_key> server_key_encoded);
449 void queue_confirm_cmd();
450 void queue_refine_cmd(refinement_type ty, merkle_node const & node);
451 void queue_data_cmd(netcmd_item_type type,
452 id const & item,
453 string const & dat);
454 void queue_delta_cmd(netcmd_item_type type,
455 id const & base,
456 id const & ident,
457 delta const & del);
458
459 // Incoming dispatch-called methods.
460 bool process_error_cmd(string const & errmsg);
461 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
462 rsa_pub_key const & server_key,
463 id const & nonce);
464 bool process_bye_cmd(u8 phase, transaction_guard & guard);
465 bool process_anonymous_cmd(protocol_role role,
466 globish const & their_include_pattern,
467 globish const & their_exclude_pattern);
468 bool process_auth_cmd(protocol_role role,
469 globish const & their_include_pattern,
470 globish const & their_exclude_pattern,
471 id const & client,
472 id const & nonce1,
473 string const & signature);
474 bool process_refine_cmd(refinement_type ty, merkle_node const & node);
475 bool process_done_cmd(netcmd_item_type type, size_t n_items);
476 bool process_data_cmd(netcmd_item_type type,
477 id const & item,
478 string const & dat);
479 bool process_delta_cmd(netcmd_item_type type,
480 id const & base,
481 id const & ident,
482 delta const & del);
483 bool process_usher_cmd(utf8 const & msg);
484
485 // The incoming dispatcher.
486 bool dispatch_payload(netcmd const & cmd,
487 transaction_guard & guard);
488
489 // Various helpers.
490 void assume_corresponding_role(protocol_role their_role);
491 void respond_to_confirm_cmd();
492 bool data_exists(netcmd_item_type type,
493 id const & item);
494 void load_data(netcmd_item_type type,
495 id const & item,
496 string & out);
497
498 void rebuild_merkle_trees(app_state & app,
499 set<branch_name> const & branches);
500
501 void send_all_data(netcmd_item_type ty, set<id> const & items);
502 void begin_service();
503 bool process(transaction_guard & guard);
504
505 bool initiated_by_server;
506};
507size_t session::session_count = 0;
508
509session::session(protocol_role role,
510 protocol_voice voice,
511 globish const & our_include_pattern,
512 globish const & our_exclude_pattern,
513 app_state & app,
514 string const & peer,
515 shared_ptr<Netxx::StreamBase> sock,
516 bool initiated_by_server) :
517 role(role),
518 voice(voice),
519 our_include_pattern(our_include_pattern),
520 our_exclude_pattern(our_exclude_pattern),
521 our_matcher(our_include_pattern, our_exclude_pattern),
522 app(app),
523 peer_id(peer),
524 str(sock),
525 inbuf(),
526 outbuf_size(0),
527 armed(false),
528 remote_peer_key_hash(""),
529 remote_peer_key_name(""),
530 session_key(constants::netsync_key_initializer),
531 read_hmac(netsync_session_key(constants::netsync_key_initializer),
532 app.opts.use_transport_auth),
533 write_hmac(netsync_session_key(constants::netsync_key_initializer),
534 app.opts.use_transport_auth),
535 authenticated(false),
536 last_io_time(::time(NULL)),
537 byte_in_ticker(NULL),
538 byte_out_ticker(NULL),
539 cert_in_ticker(NULL),
540 cert_out_ticker(NULL),
541 revision_in_ticker(NULL),
542 revision_out_ticker(NULL),
543 bytes_in(0), bytes_out(0),
544 certs_in(0), certs_out(0),
545 revs_in(0), revs_out(0),
546 keys_in(0), keys_out(0),
547 session_id(++session_count),
548 saved_nonce(""),
549 protocol_state(working_state),
550 encountered_error(false),
551 error_code(no_transfer),
552 set_totals(false),
553 epoch_refiner(epoch_item, voice, *this),
554 key_refiner(key_item, voice, *this),
555 cert_refiner(cert_item, voice, *this),
556 rev_refiner(revision_item, voice, *this),
557 rev_enumerator(*this, app),
558 initiated_by_server(initiated_by_server)
559{}
560
561session::~session()
562{
563 if (protocol_state == confirmed_state)
564 error_code = no_error;
565 else if (error_code == no_transfer &&
566 (revs_in || revs_out ||
567 certs_in || certs_out ||
568 keys_in || keys_out))
569 error_code = partial_transfer;
570
571 vector<cert> unattached_certs;
572 map<revision_id, vector<cert> > revcerts;
573 for (vector<revision_id>::iterator i = written_revisions.begin();
574 i != written_revisions.end(); ++i)
575 revcerts.insert(make_pair(*i, vector<cert>()));
576 for (vector<cert>::iterator i = written_certs.begin();
577 i != written_certs.end(); ++i)
578 {
579 map<revision_id, vector<cert> >::iterator j;
580 j = revcerts.find(revision_id(i->ident));
581 if (j == revcerts.end())
582 unattached_certs.push_back(*i);
583 else
584 j->second.push_back(*i);
585 }
586
587 // if (role == sink_role || role == source_and_sink_role)
588 if (!written_keys.empty()
589 || !written_revisions.empty()
590 || !written_certs.empty())
591 {
592
593 //Keys
594 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
595 i != written_keys.end(); ++i)
596 {
597 app.lua.hook_note_netsync_pubkey_received(*i, session_id);
598 }
599
600 //Revisions
601 for (vector<revision_id>::iterator i = written_revisions.begin();
602 i != written_revisions.end(); ++i)
603 {
604 vector<cert> & ctmp(revcerts[*i]);
605 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
606 for (vector<cert>::const_iterator j = ctmp.begin();
607 j != ctmp.end(); ++j)
608 {
609 cert_value vtmp;
610 decode_base64(j->value, vtmp);
611 certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
612 }
613 revision_data rdat;
614 app.db.get_revision(*i, rdat);
615 app.lua.hook_note_netsync_revision_received(*i, rdat, certs,
616 session_id);
617 }
618
619 //Certs (not attached to a new revision)
620 for (vector<cert>::iterator i = unattached_certs.begin();
621 i != unattached_certs.end(); ++i)
622 {
623 cert_value tmp;
624 decode_base64(i->value, tmp);
625 app.lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key,
626 i->name, tmp, session_id);
627 }
628 }
629 app.lua.hook_note_netsync_end(session_id, error_code,
630 bytes_in, bytes_out,
631 certs_in, certs_out,
632 revs_in, revs_out,
633 keys_in, keys_out);
634}
635
636bool
637session::process_this_rev(revision_id const & rev)
638{
639 id item;
640 decode_hexenc(rev.inner(), item);
641 return (rev_refiner.items_to_send.find(item)
642 != rev_refiner.items_to_send.end());
643}
644
645bool
646session::queue_this_cert(hexenc<id> const & c)
647{
648 id item;
649 decode_hexenc(c, item);
650 return (cert_refiner.items_to_send.find(item)
651 != cert_refiner.items_to_send.end());
652}
653
654bool
655session::queue_this_file(hexenc<id> const & f)
656{
657 return file_items_sent.find(file_id(f)) == file_items_sent.end();
658}
659
660void
661session::note_file_data(file_id const & f)
662{
663 if (role == sink_role)
664 return;
665 file_data fd;
666 id item;
667 decode_hexenc(f.inner(), item);
668 app.db.get_file_version(f, fd);
669 queue_data_cmd(file_item, item, fd.inner()());
670 file_items_sent.insert(f);
671}
672
673void
674session::note_file_delta(file_id const & src, file_id const & dst)
675{
676 if (role == sink_role)
677 return;
678 file_delta fdel;
679 id fid1, fid2;
680 decode_hexenc(src.inner(), fid1);
681 decode_hexenc(dst.inner(), fid2);
682 app.db.get_arbitrary_file_delta(src, dst, fdel);
683 queue_delta_cmd(file_item, fid1, fid2, fdel.inner());
684 file_items_sent.insert(dst);
685}
686
687void
688session::note_rev(revision_id const & rev)
689{
690 if (role == sink_role)
691 return;
692 revision_t rs;
693 id item;
694 decode_hexenc(rev.inner(), item);
695 app.db.get_revision(rev, rs);
696 data tmp;
697 write_revision(rs, tmp);
698 queue_data_cmd(revision_item, item, tmp());
699}
700
701void
702session::note_cert(hexenc<id> const & c)
703{
704 if (role == sink_role)
705 return;
706 id item;
707 decode_hexenc(c, item);
708 revision<cert> cert;
709 string str;
710 app.db.get_revision_cert(c, cert);
711 write_cert(cert.inner(), str);
712 queue_data_cmd(cert_item, item, str);
713}
714
715
716id
717session::mk_nonce()
718{
719 I(this->saved_nonce().size() == 0);
720 char buf[constants::merkle_hash_length_in_bytes];
721 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
722 constants::merkle_hash_length_in_bytes);
723 this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes));
724 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
725 return this->saved_nonce;
726}
727
728void
729session::mark_recent_io()
730{
731 last_io_time = ::time(NULL);
732}
733
734void
735session::set_session_key(string const & key)
736{
737 session_key = netsync_session_key(key);
738 read_hmac.set_key(session_key);
739 write_hmac.set_key(session_key);
740}
741
742void
743session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
744{
745 if (app.opts.use_transport_auth)
746 {
747 keypair our_kp;
748 load_key_pair(app, app.opts.signing_key, our_kp);
749 string hmac_key;
750 decrypt_rsa(app.lua, app.opts.signing_key, our_kp.priv,
751 hmac_key_encrypted, hmac_key);
752 set_session_key(hmac_key);
753 }
754}
755
756void
757session::setup_client_tickers()
758{
759 // xgettext: please use short message and try to avoid multibytes chars
760 byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
761 // xgettext: please use short message and try to avoid multibytes chars
762 byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
763 if (role == sink_role)
764 {
765 // xgettext: please use short message and try to avoid multibytes chars
766 cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
767 // xgettext: please use short message and try to avoid multibytes chars
768 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
769 }
770 else if (role == source_role)
771 {
772 // xgettext: please use short message and try to avoid multibytes chars
773 cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
774 // xgettext: please use short message and try to avoid multibytes chars
775 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
776 }
777 else
778 {
779 I(role == source_and_sink_role);
780 // xgettext: please use short message and try to avoid multibytes chars
781 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
782 // xgettext: please use short message and try to avoid multibytes chars
783 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
784 }
785}
786
787bool
788session::done_all_refinements()
789{
790 bool all = rev_refiner.done
791 && cert_refiner.done
792 && key_refiner.done
793 && epoch_refiner.done;
794
795 if (all && !set_totals)
796 {
797 if (cert_out_ticker.get())
798 cert_out_ticker->set_total(cert_refiner.items_to_send.size());
799
800 if (revision_out_ticker.get())
801 revision_out_ticker->set_total(rev_refiner.items_to_send.size());
802
803 if (cert_in_ticker.get())
804 cert_in_ticker->set_total(cert_refiner.items_to_receive);
805
806 if (revision_in_ticker.get())
807 revision_in_ticker->set_total(rev_refiner.items_to_receive);
808
809 set_totals = true;
810 }
811 return all;
812}
813
814
815
816bool
817session::received_all_items()
818{
819 if (role == source_role)
820 return true;
821 bool all = rev_refiner.items_to_receive == 0
822 && cert_refiner.items_to_receive == 0
823 && key_refiner.items_to_receive == 0
824 && epoch_refiner.items_to_receive == 0;
825 return all;
826}
827
828bool
829session::finished_working()
830{
831 bool all = done_all_refinements()
832 && received_all_items()
833 && queued_all_items()
834 && rev_enumerator.done();
835 return all;
836}
837
838bool
839session::queued_all_items()
840{
841 if (role == sink_role)
842 return true;
843 bool all = rev_refiner.items_to_send.empty()
844 && cert_refiner.items_to_send.empty()
845 && key_refiner.items_to_send.empty()
846 && epoch_refiner.items_to_send.empty();
847 return all;
848}
849
850
851void
852session::maybe_note_epochs_finished()
853{
854 // Maybe there are outstanding epoch requests.
855 // These only matter if we're in sink or source-and-sink mode.
856 if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
857 return;
858
859 // And maybe we haven't even finished the refinement.
860 if (!epoch_refiner.done)
861 return;
862
863 // If we ran into an error -- say a mismatched epoch -- don't do any
864 // further refinements.
865 if (encountered_error)
866 return;
867
868 // But otherwise, we're ready to go. Start the next
869 // set of refinements.
870 if (voice == client_voice)
871 {
872 L(FL("epoch refinement finished; beginning other refinements"));
873 key_refiner.begin_refinement();
874 cert_refiner.begin_refinement();
875 rev_refiner.begin_refinement();
876 }
877 else
878 L(FL("epoch refinement finished"));
879}
880
881static void
882decrement_if_nonzero(netcmd_item_type ty,
883 size_t & n)
884{
885 if (n == 0)
886 {
887 string typestr;
888 netcmd_item_type_to_string(ty, typestr);
889 E(false, F("underflow on count of %s items to receive") % typestr);
890 }
891 --n;
892 if (n == 0)
893 {
894 string typestr;
895 netcmd_item_type_to_string(ty, typestr);
896 L(FL("count of %s items to receive has reached zero") % typestr);
897 }
898}
899
900void
901session::note_item_arrived(netcmd_item_type ty, id const & ident)
902{
903 switch (ty)
904 {
905 case cert_item:
906 decrement_if_nonzero(ty, cert_refiner.items_to_receive);
907 if (cert_in_ticker.get() != NULL)
908 ++(*cert_in_ticker);
909 ++certs_in;
910 break;
911 case revision_item:
912 decrement_if_nonzero(ty, rev_refiner.items_to_receive);
913 if (revision_in_ticker.get() != NULL)
914 ++(*revision_in_ticker);
915 ++revs_in;
916 break;
917 case key_item:
918 decrement_if_nonzero(ty, key_refiner.items_to_receive);
919 ++keys_in;
920 break;
921 case epoch_item:
922 decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
923 break;
924 default:
925 // No ticker for other things.
926 break;
927 }
928}
929
930
931
932void
933session::note_item_sent(netcmd_item_type ty, id const & ident)
934{
935 switch (ty)
936 {
937 case cert_item:
938 cert_refiner.items_to_send.erase(ident);
939 if (cert_out_ticker.get() != NULL)
940 ++(*cert_out_ticker);
941 ++certs_out;
942 break;
943 case revision_item:
944 rev_refiner.items_to_send.erase(ident);
945 if (revision_out_ticker.get() != NULL)
946 ++(*revision_out_ticker);
947 ++revs_out;
948 break;
949 case key_item:
950 key_refiner.items_to_send.erase(ident);
951 ++keys_out;
952 break;
953 case epoch_item:
954 epoch_refiner.items_to_send.erase(ident);
955 break;
956 default:
957 // No ticker for other things.
958 break;
959 }
960}
961
962void
963session::write_netcmd_and_try_flush(netcmd const & cmd)
964{
965 if (!encountered_error)
966 {
967 string buf;
968 cmd.write(buf, write_hmac);
969 outbuf.push_back(make_pair(buf, 0));
970 outbuf_size += buf.size();
971 }
972 else
973 L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
974 // FIXME: this helps keep the protocol pipeline full but it seems to
975 // interfere with initial and final sequences. careful with it.
976 // write_some();
977 // read_some();
978}
979
980// This method triggers a special "error unwind" mode to netsync. In this
981// mode, all received data is ignored, and no new data is queued. We simply
982// stay connected long enough for the current write buffer to be flushed, to
983// ensure that our peer receives the error message.
984// Affects read_some, write_some, and process .
985void
986session::error(int errcode, string const & errmsg)
987{
988 error_code = errcode;
989 throw netsync_error(errmsg);
990}
991
992Netxx::Probe::ready_type
993session::which_events() const
994{
995 // Only ask to read if we're not armed.
996 if (outbuf.empty())
997 {
998 if (inbuf.size() < constants::netcmd_maxsz && !armed)
999 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1000 else
1001 return Netxx::Probe::ready_oobd;
1002 }
1003 else
1004 {
1005 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1006 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1007 else
1008 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1009 }
1010}
1011
1012bool
1013session::read_some()
1014{
1015 I(inbuf.size() < constants::netcmd_maxsz);
1016 char tmp[constants::bufsz];
1017 Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
1018 if (count > 0)
1019 {
1020 L(FL("read %d bytes from fd %d (peer %s)") % count % str->get_socketfd() % peer_id);
1021 if (encountered_error)
1022 {
1023 L(FL("in error unwind mode, so throwing them into the bit bucket"));
1024 return true;
1025 }
1026 inbuf.append(tmp,count);
1027 mark_recent_io();
1028 if (byte_in_ticker.get() != NULL)
1029 (*byte_in_ticker) += count;
1030 bytes_in += count;
1031 return true;
1032 }
1033 else
1034 return false;
1035}
1036
1037bool
1038session::write_some()
1039{
1040 I(!outbuf.empty());
1041 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
1042 Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
1043 min(writelen,
1044 constants::bufsz));
1045 if (count > 0)
1046 {
1047 if ((size_t)count == writelen)
1048 {
1049 outbuf_size -= outbuf.front().first.size();
1050 outbuf.pop_front();
1051 }
1052 else
1053 {
1054 outbuf.front().second += count;
1055 }
1056 L(FL("wrote %d bytes to fd %d (peer %s)")
1057 % count % str->get_socketfd() % peer_id);
1058 mark_recent_io();
1059 if (byte_out_ticker.get() != NULL)
1060 (*byte_out_ticker) += count;
1061 bytes_out += count;
1062 if (encountered_error && outbuf.empty())
1063 {
1064 // we've flushed our error message, so it's time to get out.
1065 L(FL("finished flushing output queue in error unwind mode, disconnecting"));
1066 return false;
1067 }
1068 return true;
1069 }
1070 else
1071 return false;
1072}
1073
1074// senders
1075
1076void
1077session::queue_error_cmd(string const & errmsg)
1078{
1079 L(FL("queueing 'error' command"));
1080 netcmd cmd;
1081 cmd.write_error_cmd(errmsg);
1082 write_netcmd_and_try_flush(cmd);
1083}
1084
1085void
1086session::queue_bye_cmd(u8 phase)
1087{
1088 L(FL("queueing 'bye' command, phase %d")
1089 % static_cast<size_t>(phase));
1090 netcmd cmd;
1091 cmd.write_bye_cmd(phase);
1092 write_netcmd_and_try_flush(cmd);
1093}
1094
1095void
1096session::queue_done_cmd(netcmd_item_type type,
1097 size_t n_items)
1098{
1099 string typestr;
1100 netcmd_item_type_to_string(type, typestr);
1101 L(FL("queueing 'done' command for %s (%d items)")
1102 % typestr % n_items);
1103 netcmd cmd;
1104 cmd.write_done_cmd(type, n_items);
1105 write_netcmd_and_try_flush(cmd);
1106}
1107
1108void
1109session::queue_hello_cmd(rsa_keypair_id const & key_name,
1110 base64<rsa_pub_key> const & pub_encoded,
1111 id const & nonce)
1112{
1113 rsa_pub_key pub;
1114 if (app.opts.use_transport_auth)
1115 decode_base64(pub_encoded, pub);
1116 cmd.write_hello_cmd(key_name, pub, nonce);
1117 write_netcmd_and_try_flush(cmd);
1118}
1119
1120void
1121session::queue_anonymous_cmd(protocol_role role,
1122 globish const & include_pattern,
1123 globish const & exclude_pattern,
1124 id const & nonce2,
1125 base64<rsa_pub_key> server_key_encoded)
1126{
1127 netcmd cmd;
1128 rsa_oaep_sha_data hmac_key_encrypted;
1129 if (app.opts.use_transport_auth)
1130 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1131 nonce2(), hmac_key_encrypted);
1132 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1133 hmac_key_encrypted);
1134 write_netcmd_and_try_flush(cmd);
1135 set_session_key(nonce2());
1136}
1137
1138void
1139session::queue_auth_cmd(protocol_role role,
1140 globish const & include_pattern,
1141 globish const & exclude_pattern,
1142 id const & client,
1143 id const & nonce1,
1144 id const & nonce2,
1145 string const & signature,
1146 base64<rsa_pub_key> server_key_encoded)
1147{
1148 netcmd cmd;
1149 rsa_oaep_sha_data hmac_key_encrypted;
1150 I(app.opts.use_transport_auth);
1151 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1152 nonce2(), hmac_key_encrypted);
1153 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1154 nonce1, hmac_key_encrypted, signature);
1155 write_netcmd_and_try_flush(cmd);
1156 set_session_key(nonce2());
1157}
1158
1159void
1160session::queue_confirm_cmd()
1161{
1162 netcmd cmd;
1163 cmd.write_confirm_cmd();
1164 write_netcmd_and_try_flush(cmd);
1165}
1166
1167void
1168session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
1169{
1170 string typestr;
1171 hexenc<prefix> hpref;
1172 node.get_hex_prefix(hpref);
1173 netcmd_item_type_to_string(node.type, typestr);
1174 L(FL("queueing refinement %s of %s node '%s', level %d")
1175 % (ty == refinement_query ? "query" : "response")
1176 % typestr % hpref % static_cast<int>(node.level));
1177 netcmd cmd;
1178 cmd.write_refine_cmd(ty, node);
1179 write_netcmd_and_try_flush(cmd);
1180}
1181
1182void
1183session::queue_data_cmd(netcmd_item_type type,
1184 id const & item,
1185 string const & dat)
1186{
1187 string typestr;
1188 netcmd_item_type_to_string(type, typestr);
1189 hexenc<id> hid;
1190 encode_hexenc(item, hid);
1191
1192 if (role == sink_role)
1193 {
1194 L(FL("not queueing %s data for '%s' as we are in pure sink role")
1195 % typestr % hid);
1196 return;
1197 }
1198
1199 L(FL("queueing %d bytes of data for %s item '%s'")
1200 % dat.size() % typestr % hid);
1201
1202 netcmd cmd;
1203 // TODO: This pair of functions will make two copies of a large
1204 // file, the first in cmd.write_data_cmd, and the second in
1205 // write_netcmd_and_try_flush when the data is copied from the
1206 // cmd.payload variable to the string buffer for output. This
1207 // double copy should be collapsed out, it may be better to use
1208 // a string_queue for output as well as input, as that will reduce
1209 // the amount of mallocs that happen when the string queue is large
1210 // enough to just store the data.
1211 cmd.write_data_cmd(type, item, dat);
1212 write_netcmd_and_try_flush(cmd);
1213 note_item_sent(type, item);
1214}
1215
1216void
1217session::queue_delta_cmd(netcmd_item_type type,
1218 id const & base,
1219 id const & ident,
1220 delta const & del)
1221{
1222 I(type == file_item);
1223 string typestr;
1224 netcmd_item_type_to_string(type, typestr);
1225 hexenc<id> base_hid;
1226 encode_hexenc(base, base_hid);
1227 hexenc<id> ident_hid;
1228 encode_hexenc(ident, ident_hid);
1229
1230 if (role == sink_role)
1231 {
1232 L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role")
1233 % typestr % base_hid % ident_hid);
1234 return;
1235 }
1236
1237 L(FL("queueing %s delta '%s' -> '%s'")
1238 % typestr % base_hid % ident_hid);
1239 netcmd cmd;
1240 cmd.write_delta_cmd(type, base, ident, del);
1241 write_netcmd_and_try_flush(cmd);
1242 note_item_sent(type, ident);
1243}
1244
1245
1246// processors
1247
1248bool
1249session::process_error_cmd(string const & errmsg)
1250{
1251 // "xxx string" with xxx being digits means there's an error code
1252 if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
1253 {
1254 try
1255 {
1256 int err = boost::lexical_cast<int>(errmsg.substr(0,3));
1257 if (err >= 100)
1258 {
1259 error_code = err;
1260 throw bad_decode(F("received network error: %s")
1261 % errmsg.substr(4));
1262 }
1263 }
1264 catch (boost::bad_lexical_cast)
1265 { // ok, so it wasn't a number
1266 }
1267 }
1268 throw bad_decode(F("received network error: %s") % errmsg);
1269}
1270
1271static const var_domain known_servers_domain = var_domain("known-servers");
1272
1273bool
1274session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1275 rsa_pub_key const & their_key,
1276 id const & nonce)
1277{
1278 I(this->remote_peer_key_hash().size() == 0);
1279 I(this->saved_nonce().size() == 0);
1280
1281 base64<rsa_pub_key> their_key_encoded;
1282
1283 if (app.opts.use_transport_auth)
1284 {
1285 hexenc<id> their_key_hash;
1286 encode_base64(their_key, their_key_encoded);
1287 key_hash_code(their_keyname, their_key_encoded, their_key_hash);
1288 L(FL("server key has name %s, hash %s") % their_keyname % their_key_hash);
1289 var_key their_key_key(known_servers_domain, var_name(peer_id));
1290 if (app.db.var_exists(their_key_key))
1291 {
1292 var_value expected_key_hash;
1293 app.db.get_var(their_key_key, expected_key_hash);
1294 if (expected_key_hash() != their_key_hash())
1295 {
1296 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1297 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1298 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1299 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1300 "it is also possible that the server key has just been changed\n"
1301 "remote host sent key %s\n"
1302 "I expected %s\n"
1303 "'%s unset %s %s' overrides this check")
1304 % their_key_hash % expected_key_hash
1305 % ui.prog_name % their_key_key.first % their_key_key.second);
1306 E(false, F("server key changed"));
1307 }
1308 }
1309 else
1310 {
1311 P(F("first time connecting to server %s\n"
1312 "I'll assume it's really them, but you might want to double-check\n"
1313 "their key's fingerprint: %s") % peer_id % their_key_hash);
1314 app.db.set_var(their_key_key, var_value(their_key_hash()));
1315 }
1316 if (app.db.put_key(their_keyname, their_key_encoded))
1317 W(F("saving public key for %s to database") % their_keyname);
1318
1319 {
1320 hexenc<id> hnonce;
1321 encode_hexenc(nonce, hnonce);
1322 L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
1323 % their_key_hash % hnonce);
1324 }
1325
1326 I(app.db.public_key_exists(their_key_hash));
1327
1328 // save their identity
1329 id their_key_hash_decoded;
1330 decode_hexenc(their_key_hash, their_key_hash_decoded);
1331 this->remote_peer_key_hash = their_key_hash_decoded;
1332 }
1333
1334 // clients always include in the synchronization set, every branch that the
1335 // user requested
1336 set<branch_name> all_branches, ok_branches;
1337 app.get_project().get_branch_list(all_branches, false);
1338 for (set<branch_name>::const_iterator i = all_branches.begin();
1339 i != all_branches.end(); i++)
1340 {
1341 if (our_matcher((*i)()))
1342 ok_branches.insert(*i);
1343 }
1344 rebuild_merkle_trees(app, ok_branches);
1345
1346 if (!initiated_by_server)
1347 setup_client_tickers();
1348
1349 if (app.opts.use_transport_auth &&
1350 app.opts.signing_key() != "")
1351 {
1352 // get our key pair
1353 keypair our_kp;
1354 load_key_pair(app, app.opts.signing_key, our_kp);
1355
1356 // get the hash identifier for our pubkey
1357 hexenc<id> our_key_hash;
1358 id our_key_hash_raw;
1359 key_hash_code(app.opts.signing_key, our_kp.pub, our_key_hash);
1360 decode_hexenc(our_key_hash, our_key_hash_raw);
1361
1362 // make a signature
1363 base64<rsa_sha1_signature> sig;
1364 rsa_sha1_signature sig_raw;
1365 make_signature(app, app.opts.signing_key, our_kp.priv, nonce(), sig);
1366 decode_base64(sig, sig_raw);
1367
1368 // make a new nonce of our own and send off the 'auth'
1369 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1370 our_key_hash_raw, nonce, mk_nonce(), sig_raw(),
1371 their_key_encoded);
1372 }
1373 else
1374 {
1375 queue_anonymous_cmd(this->role, our_include_pattern,
1376 our_exclude_pattern, mk_nonce(), their_key_encoded);
1377 }
1378
1379 app.lua.hook_note_netsync_start(session_id, "client", this->role,
1380 peer_id, their_keyname,
1381 our_include_pattern, our_exclude_pattern);
1382
1383 return true;
1384}
1385
1386bool
1387session::process_anonymous_cmd(protocol_role their_role,
1388 globish const & their_include_pattern,
1389 globish const & their_exclude_pattern)
1390{
1391 // Internally netsync thinks in terms of sources and sinks. Users like
1392 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1393 //
1394 // We therefore use the read/write terminology when dealing with the UI:
1395 // if the user asks to run a "read only" service, this means they are
1396 // willing to be a source but not a sink.
1397 //
1398 // nb: The "role" here is the role the *client* wants to play
1399 // so we need to check that the opposite role is allowed for us,
1400 // in our this->role field.
1401 //
1402
1403 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1404 peer_id, rsa_keypair_id(),
1405 their_include_pattern, their_exclude_pattern);
1406
1407 // Client must be a sink and server must be a source (anonymous
1408 // read-only), unless transport auth is disabled.
1409 //
1410 // If running in no-transport-auth mode, we operate anonymously and
1411 // permit adoption of any role.
1412
1413 if (app.opts.use_transport_auth)
1414 {
1415 if (their_role != sink_role)
1416 {
1417 this->saved_nonce = id("");
1418 error(not_permitted,
1419 F("rejected attempt at anonymous connection for write").str());
1420 }
1421
1422 if (this->role == sink_role)
1423 {
1424 this->saved_nonce = id("");
1425 error(role_mismatch,
1426 F("rejected attempt at anonymous connection while running as sink").str());
1427 }
1428 }
1429
1430 set<branch_name> all_branches, ok_branches;
1431 app.get_project().get_branch_list(all_branches, false);
1432 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1433 for (set<branch_name>::const_iterator i = all_branches.begin();
1434 i != all_branches.end(); i++)
1435 {
1436 if (their_matcher((*i)()))
1437 {
1438 if (app.opts.use_transport_auth &&
1439 !app.lua.hook_get_netsync_read_permitted((*i)()))
1440 {
1441 error(not_permitted,
1442 (F("anonymous access to branch '%s' denied by server")
1443 % *i).str());
1444 }
1445 else
1446 ok_branches.insert(*i);
1447 }
1448 }
1449
1450 if (app.opts.use_transport_auth)
1451 {
1452 P(F("allowed anonymous read permission for '%s' excluding '%s'")
1453 % their_include_pattern % their_exclude_pattern);
1454 this->role = source_role;
1455 }
1456 else
1457 {
1458 P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
1459 % their_include_pattern % their_exclude_pattern);
1460 assume_corresponding_role(their_role);
1461 }
1462
1463 rebuild_merkle_trees(app, ok_branches);
1464
1465 this->remote_peer_key_name = rsa_keypair_id("");
1466 this->authenticated = true;
1467 return true;
1468}
1469
1470void
1471session::assume_corresponding_role(protocol_role their_role)
1472{
1473 // Assume the (possibly degraded) opposite role.
1474 switch (their_role)
1475 {
1476 case source_role:
1477 I(this->role != source_role);
1478 this->role = sink_role;
1479 break;
1480
1481 case source_and_sink_role:
1482 I(this->role == source_and_sink_role);
1483 break;
1484
1485 case sink_role:
1486 I(this->role != sink_role);
1487 this->role = source_role;
1488 break;
1489 }
1490}
1491
1492bool
1493session::process_auth_cmd(protocol_role their_role,
1494 globish const & their_include_pattern,
1495 globish const & their_exclude_pattern,
1496 id const & client,
1497 id const & nonce1,
1498 string const & signature)
1499{
1500 I(this->remote_peer_key_hash().size() == 0);
1501 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1502
1503 hexenc<id> their_key_hash;
1504 encode_hexenc(client, their_key_hash);
1505
1506 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1507
1508 if (!app.db.public_key_exists(their_key_hash))
1509 {
1510 // If it's not in the db, it still could be in the keystore if we
1511 // have the private key that goes with it.
1512 if (!app.keys.try_ensure_in_db(their_key_hash))
1513 {
1514 this->saved_nonce = id("");
1515
1516 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1517 peer_id, rsa_keypair_id("-unknown-"),
1518 their_include_pattern,
1519 their_exclude_pattern);
1520 error(unknown_key,
1521 (F("remote public key hash '%s' is unknown") % their_key_hash).str());
1522 }
1523 }
1524
1525 // Get their public key.
1526 rsa_keypair_id their_id;
1527 base64<rsa_pub_key> their_key;
1528 app.db.get_pubkey(their_key_hash, their_id, their_key);
1529
1530 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1531 peer_id, their_id,
1532 their_include_pattern, their_exclude_pattern);
1533
1534 // Check that they replied with the nonce we asked for.
1535 if (!(nonce1 == this->saved_nonce))
1536 {
1537 this->saved_nonce = id("");
1538 error(failed_identification,
1539 F("detected replay attack in auth netcmd").str());
1540 }
1541
1542 // Internally netsync thinks in terms of sources and sinks. users like
1543 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1544 //
1545 // We therefore use the read/write terminology when dealing with the UI:
1546 // if the user asks to run a "read only" service, this means they are
1547 // willing to be a source but not a sink.
1548 //
1549 // nb: The "their_role" here is the role the *client* wants to play
1550 // so we need to check that the opposite role is allowed for us,
1551 // in our this->role field.
1552
1553 // Client as sink, server as source (reading).
1554
1555 if (their_role == sink_role || their_role == source_and_sink_role)
1556 {
1557 if (this->role != source_role && this->role != source_and_sink_role)
1558 {
1559 this->saved_nonce = id("");
1560 error(not_permitted,
1561 (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
1562 % their_id % their_include_pattern % their_exclude_pattern).str());
1563 }
1564 }
1565
1566 set<branch_name> all_branches, ok_branches;
1567 app.get_project().get_branch_list(all_branches, false);
1568 for (set<branch_name>::const_iterator i = all_branches.begin();
1569 i != all_branches.end(); i++)
1570 {
1571 if (their_matcher((*i)()))
1572 {
1573 if (!app.lua.hook_get_netsync_read_permitted((*i)(), their_id))
1574 {
1575 error(not_permitted,
1576 (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
1577 % their_id % their_include_pattern % their_exclude_pattern % *i).str());
1578 }
1579 else
1580 ok_branches.insert(*i);
1581 }
1582 }
1583
1584 // If we're source_and_sink_role, continue even with no branches readable
1585 // eg. serve --db=empty.db
1586 P(F("allowed '%s' read permission for '%s' excluding '%s'")
1587 % their_id % their_include_pattern % their_exclude_pattern);
1588
1589 // Client as source, server as sink (writing).
1590
1591 if (their_role == source_role || their_role == source_and_sink_role)
1592 {
1593 if (this->role != sink_role && this->role != source_and_sink_role)
1594 {
1595 this->saved_nonce = id("");
1596 error(not_permitted,
1597 (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
1598 % their_id % their_include_pattern % their_exclude_pattern).str());
1599 }
1600
1601 if (!app.lua.hook_get_netsync_write_permitted(their_id))
1602 {
1603 this->saved_nonce = id("");
1604 error(not_permitted,
1605 (F("denied '%s' write permission for '%s' excluding '%s'")
1606 % their_id % their_include_pattern % their_exclude_pattern).str());
1607 }
1608
1609 P(F("allowed '%s' write permission for '%s' excluding '%s'")
1610 % their_id % their_include_pattern % their_exclude_pattern);
1611 }
1612
1613 rebuild_merkle_trees(app, ok_branches);
1614
1615 // Save their identity.
1616 this->remote_peer_key_hash = client;
1617
1618 // Check the signature.
1619 base64<rsa_sha1_signature> sig;
1620 encode_base64(rsa_sha1_signature(signature), sig);
1621 if (check_signature(app, their_id, their_key, nonce1(), sig))
1622 {
1623 // Get our private key and sign back.
1624 L(FL("client signature OK, accepting authentication"));
1625 this->authenticated = true;
1626 this->remote_peer_key_name = their_id;
1627
1628 assume_corresponding_role(their_role);
1629 return true;
1630 }
1631 else
1632 {
1633 error(failed_identification, (F("bad client signature")).str());
1634 }
1635 return false;
1636}
1637
1638bool
1639session::process_refine_cmd(refinement_type ty, merkle_node const & node)
1640{
1641 string typestr;
1642 netcmd_item_type_to_string(node.type, typestr);
1643 L(FL("processing refine cmd for %s node at level %d")
1644 % typestr % node.level);
1645
1646 switch (node.type)
1647 {
1648 case file_item:
1649 W(F("Unexpected 'refine' command on non-refined item type"));
1650 break;
1651
1652 case key_item:
1653 key_refiner.process_refinement_command(ty, node);
1654 break;
1655
1656 case revision_item:
1657 rev_refiner.process_refinement_command(ty, node);
1658 break;
1659
1660 case cert_item:
1661 cert_refiner.process_refinement_command(ty, node);
1662 break;
1663
1664 case epoch_item:
1665 epoch_refiner.process_refinement_command(ty, node);
1666 break;
1667 }
1668 return true;
1669}
1670
1671bool
1672session::process_bye_cmd(u8 phase,
1673 transaction_guard & guard)
1674{
1675
1676// Ideal shutdown
1677// ~~~~~~~~~~~~~~~
1678//
1679// I/O events state transitions
1680// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
1681// client: C_WORKING
1682// server: S_WORKING
1683// 0. [refinement, data, deltas, etc.]
1684// client: C_SHUTDOWN
1685// (client checkpoints here)
1686// 1. client -> "bye 0"
1687// 2. "bye 0" -> server
1688// server: S_SHUTDOWN
1689// (server checkpoints here)
1690// 3. "bye 1" <- server
1691// 4. client <- "bye 1"
1692// client: C_CONFIRMED
1693// 5. client -> "bye 2"
1694// 6. "bye 2" -> server
1695// server: S_CONFIRMED
1696// 7. [server drops connection]
1697//
1698//
1699// Affects of I/O errors or disconnections
1700// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1701// C_WORKING: report error and fault
1702// S_WORKING: report error and recover
1703// C_SHUTDOWN: report error and fault
1704// S_SHUTDOWN: report success and recover
1705// (and warn that client might falsely see error)
1706// C_CONFIRMED: report success
1707// S_CONFIRMED: report success
1708
1709 switch (phase)
1710 {
1711 case 0:
1712 if (voice == server_voice &&
1713 protocol_state == working_state)
1714 {
1715 protocol_state = shutdown_state;
1716 guard.do_checkpoint();
1717 queue_bye_cmd(1);
1718 }
1719 else
1720 error(bad_command, "unexpected bye phase 0 received");
1721 break;
1722
1723 case 1:
1724 if (voice == client_voice &&
1725 protocol_state == shutdown_state)
1726 {
1727 protocol_state = confirmed_state;
1728 queue_bye_cmd(2);
1729 }
1730 else
1731 error(bad_command, "unexpected bye phase 1 received");
1732 break;
1733
1734 case 2:
1735 if (voice == server_voice &&
1736 protocol_state == shutdown_state)
1737 {
1738 protocol_state = confirmed_state;
1739 return false;
1740 }
1741 else
1742 error(bad_command, "unexpected bye phase 2 received");
1743 break;
1744
1745 default:
1746 error(bad_command, (F("unknown bye phase %d received") % phase).str());
1747 }
1748
1749 return true;
1750}
1751
1752bool
1753session::process_done_cmd(netcmd_item_type type, size_t n_items)
1754{
1755 string typestr;
1756 netcmd_item_type_to_string(type, typestr);
1757 L(FL("received 'done' command for %s (%s items)") % typestr % n_items);
1758 switch (type)
1759 {
1760 case file_item:
1761 W(F("Unexpected 'done' command on non-refined item type"));
1762 break;
1763
1764 case key_item:
1765 key_refiner.process_done_command(n_items);
1766 if (key_refiner.done && role != sink_role)
1767 send_all_data(key_item, key_refiner.items_to_send);
1768 break;
1769
1770 case revision_item:
1771 rev_refiner.process_done_command(n_items);
1772 break;
1773
1774 case cert_item:
1775 cert_refiner.process_done_command(n_items);
1776 break;
1777
1778 case epoch_item:
1779 epoch_refiner.process_done_command(n_items);
1780 if (epoch_refiner.done)
1781 {
1782 send_all_data(epoch_item, epoch_refiner.items_to_send);
1783 maybe_note_epochs_finished();
1784 }
1785 break;
1786 }
1787 return true;
1788}
1789
1790void
1791session::respond_to_confirm_cmd()
1792{
1793 epoch_refiner.begin_refinement();
1794}
1795
1796bool
1797session::data_exists(netcmd_item_type type,
1798 id const & item)
1799{
1800 hexenc<id> hitem;
1801 encode_hexenc(item, hitem);
1802 switch (type)
1803 {
1804 case key_item:
1805 return key_refiner.local_item_exists(item)
1806 || app.db.public_key_exists(hitem);
1807 case file_item:
1808 return app.db.file_version_exists(file_id(hitem));
1809 case revision_item:
1810 return rev_refiner.local_item_exists(item)
1811 || app.db.revision_exists(revision_id(hitem));
1812 case cert_item:
1813 return cert_refiner.local_item_exists(item)
1814 || app.db.revision_cert_exists(hitem);
1815 case epoch_item:
1816 return epoch_refiner.local_item_exists(item)
1817 || app.db.epoch_exists(epoch_id(hitem));
1818 }
1819 return false;
1820}
1821
1822void
1823session::load_data(netcmd_item_type type,
1824 id const & item,
1825 string & out)
1826{
1827 string typestr;
1828 netcmd_item_type_to_string(type, typestr);
1829 hexenc<id> hitem;
1830 encode_hexenc(item, hitem);
1831
1832 if (!data_exists(type, item))
1833 throw bad_decode(F("%s with hash '%s' does not exist in our database")
1834 % typestr % hitem);
1835
1836 switch (type)
1837 {
1838 case epoch_item:
1839 {
1840 branch_name branch;
1841 epoch_data epoch;
1842 app.db.get_epoch(epoch_id(hitem), branch, epoch);
1843 write_epoch(branch, epoch, out);
1844 }
1845 break;
1846 case key_item:
1847 {
1848 rsa_keypair_id keyid;
1849 base64<rsa_pub_key> pub_encoded;
1850 app.db.get_pubkey(hitem, keyid, pub_encoded);
1851 L(FL("public key '%s' is also called '%s'") % hitem % keyid);
1852 write_pubkey(keyid, pub_encoded, out);
1853 }
1854 break;
1855
1856 case revision_item:
1857 {
1858 revision_data mdat;
1859 data dat;
1860 app.db.get_revision(revision_id(hitem), mdat);
1861 out = mdat.inner()();
1862 }
1863 break;
1864
1865 case file_item:
1866 {
1867 file_data fdat;
1868 data dat;
1869 app.db.get_file_version(file_id(hitem), fdat);
1870 out = fdat.inner()();
1871 }
1872 break;
1873
1874 case cert_item:
1875 {
1876 revision<cert> c;
1877 app.db.get_revision_cert(hitem, c);
1878 string tmp;
1879 write_cert(c.inner(), out);
1880 }
1881 break;
1882 }
1883}
1884
1885bool
1886session::process_data_cmd(netcmd_item_type type,
1887 id const & item,
1888 string const & dat)
1889{
1890 hexenc<id> hitem;
1891 encode_hexenc(item, hitem);
1892
1893 string typestr;
1894 netcmd_item_type_to_string(type, typestr);
1895
1896 note_item_arrived(type, item);
1897 if (data_exists(type, item))
1898 {
1899 L(FL("%s '%s' already exists in our database") % typestr % hitem);
1900 if (type == epoch_item)
1901 maybe_note_epochs_finished();
1902 return true;
1903 }
1904
1905 switch (type)
1906 {
1907 case epoch_item:
1908 {
1909 branch_name branch;
1910 epoch_data epoch;
1911 read_epoch(dat, branch, epoch);
1912 L(FL("received epoch %s for branch %s") % epoch % branch);
1913 map<branch_name, epoch_data> epochs;
1914 app.db.get_epochs(epochs);
1915 map<branch_name, epoch_data>::const_iterator i;
1916 i = epochs.find(branch);
1917 if (i == epochs.end())
1918 {
1919 L(FL("branch %s has no epoch; setting epoch to %s") % branch % epoch);
1920 app.db.set_epoch(branch, epoch);
1921 }
1922 else
1923 {
1924 L(FL("branch %s already has an epoch; checking") % branch);
1925 // If we get here, then we know that the epoch must be
1926 // different, because if it were the same then the
1927 // if (epoch_exists()) branch up above would have been taken.
1928 // if somehow this is wrong, then we have broken epoch
1929 // hashing or something, which is very dangerous, so play it
1930 // safe...
1931 I(!(i->second == epoch));
1932
1933 // It is safe to call 'error' here, because if we get here,
1934 // then the current netcmd packet cannot possibly have
1935 // written anything to the database.
1936 error(mixing_versions,
1937 (F("Mismatched epoch on branch %s."
1938 " Server has '%s', client has '%s'.")
1939 % branch
1940 % (voice == server_voice ? i->second : epoch)
1941 % (voice == server_voice ? epoch : i->second)).str());
1942 }
1943 }
1944 maybe_note_epochs_finished();
1945 break;
1946
1947 case key_item:
1948 {
1949 rsa_keypair_id keyid;
1950 base64<rsa_pub_key> pub;
1951 read_pubkey(dat, keyid, pub);
1952 hexenc<id> tmp;
1953 key_hash_code(keyid, pub, tmp);
1954 if (! (tmp == hitem))
1955 throw bad_decode(F("hash check failed for public key '%s' (%s);"
1956 " wanted '%s' got '%s'")
1957 % hitem % keyid % hitem % tmp);
1958 if (app.db.put_key(keyid, pub))
1959 written_keys.push_back(keyid);
1960 else
1961 error(partial_transfer,
1962 (F("Received duplicate key %s") % keyid).str());
1963 }
1964 break;
1965
1966 case cert_item:
1967 {
1968 cert c;
1969 read_cert(dat, c);
1970 hexenc<id> tmp;
1971 cert_hash_code(c, tmp);
1972 if (! (tmp == hitem))
1973 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem);
1974 if (app.db.put_revision_cert(revision<cert>(c)))
1975 written_certs.push_back(c);
1976 }
1977 break;
1978
1979 case revision_item:
1980 {
1981 L(FL("received revision '%s'") % hitem);
1982 if (app.db.put_revision(revision_id(hitem), revision_data(dat)))
1983 written_revisions.push_back(revision_id(hitem));
1984 }
1985 break;
1986
1987 case file_item:
1988 {
1989 L(FL("received file '%s'") % hitem);
1990 app.db.put_file(file_id(hitem), file_data(dat));
1991 }
1992 break;
1993 }
1994 return true;
1995}
1996
1997bool
1998session::process_delta_cmd(netcmd_item_type type,
1999 id const & base,
2000 id const & ident,
2001 delta const & del)
2002{
2003 string typestr;
2004 netcmd_item_type_to_string(type, typestr);
2005 hexenc<id> hbase, hident;
2006 encode_hexenc(base, hbase);
2007 encode_hexenc(ident, hident);
2008
2009 pair<id,id> id_pair = make_pair(base, ident);
2010
2011 note_item_arrived(type, ident);
2012
2013 switch (type)
2014 {
2015 case file_item:
2016 {
2017 file_id src_file(hbase), dst_file(hident);
2018 app.db.put_file_version(src_file, dst_file, file_delta(del));
2019 }
2020 break;
2021
2022 default:
2023 L(FL("ignoring delta received for item type %s") % typestr);
2024 break;
2025 }
2026 return true;
2027}
2028
2029bool
2030session::process_usher_cmd(utf8 const & msg)
2031{
2032 if (msg().size())
2033 {
2034 if (msg()[0] == '!')
2035 P(F("Received warning from usher: %s") % msg().substr(1));
2036 else
2037 L(FL("Received greeting from usher: %s") % msg().substr(1));
2038 }
2039 netcmd cmdout;
2040 cmdout.write_usher_reply_cmd(utf8(peer_id), our_include_pattern);
2041 write_netcmd_and_try_flush(cmdout);
2042 L(FL("Sent reply."));
2043 return true;
2044}
2045
2046
2047void
2048session::send_all_data(netcmd_item_type ty, set<id> const & items)
2049{
2050 string typestr;
2051 netcmd_item_type_to_string(ty, typestr);
2052
2053 // Use temporary; passed arg will be invalidated during iteration.
2054 set<id> tmp = items;
2055
2056 for (set<id>::const_iterator i = tmp.begin();
2057 i != tmp.end(); ++i)
2058 {
2059 hexenc<id> hitem;
2060 encode_hexenc(*i, hitem);
2061
2062 if (data_exists(ty, *i))
2063 {
2064 string out;
2065 load_data(ty, *i, out);
2066 queue_data_cmd(ty, *i, out);
2067 }
2068 }
2069}
2070
2071bool
2072session::dispatch_payload(netcmd const & cmd,
2073 transaction_guard & guard)
2074{
2075
2076 switch (cmd.get_cmd_code())
2077 {
2078
2079 case error_cmd:
2080 {
2081 string errmsg;
2082 cmd.read_error_cmd(errmsg);
2083 return process_error_cmd(errmsg);
2084 }
2085 break;
2086
2087 case hello_cmd:
2088 require(! authenticated, "hello netcmd received when not authenticated");
2089 require(voice == client_voice, "hello netcmd received in client voice");
2090 {
2091 rsa_keypair_id server_keyname;
2092 rsa_pub_key server_key;
2093 id nonce;
2094 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2095 return process_hello_cmd(server_keyname, server_key, nonce);
2096 }
2097 break;
2098
2099 case bye_cmd:
2100 require(authenticated, "bye netcmd received when not authenticated");
2101 {
2102 u8 phase;
2103 cmd.read_bye_cmd(phase);
2104 return process_bye_cmd(phase, guard);
2105 }
2106 break;
2107
2108 case anonymous_cmd:
2109 require(! authenticated, "anonymous netcmd received when not authenticated");
2110 require(voice == server_voice, "anonymous netcmd received in server voice");
2111 require(role == source_role ||
2112 role == source_and_sink_role,
2113 "anonymous netcmd received in source or source/sink role");
2114 {
2115 protocol_role role;
2116 globish their_include_pattern, their_exclude_pattern;
2117 rsa_oaep_sha_data hmac_key_encrypted;
2118 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2119 L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2120 "in %s mode\n")
2121 % their_include_pattern % their_exclude_pattern
2122 % (role == source_and_sink_role ? _("source and sink") :
2123 (role == source_role ? _("source") : _("sink"))));
2124
2125 set_session_key(hmac_key_encrypted);
2126 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2127 return false;
2128 queue_confirm_cmd();
2129 return true;
2130 }
2131 break;
2132
2133 case auth_cmd:
2134 require(! authenticated, "auth netcmd received when not authenticated");
2135 require(voice == server_voice, "auth netcmd received in server voice");
2136 {
2137 protocol_role role;
2138 string signature;
2139 globish their_include_pattern, their_exclude_pattern;
2140 id client, nonce1, nonce2;
2141 rsa_oaep_sha_data hmac_key_encrypted;
2142 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2143 client, nonce1, hmac_key_encrypted, signature);
2144
2145 hexenc<id> their_key_hash;
2146 encode_hexenc(client, their_key_hash);
2147 hexenc<id> hnonce1;
2148 encode_hexenc(nonce1, hnonce1);
2149
2150 L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2151 "exclude '%s' in %s mode with nonce1 '%s'\n")
2152 % their_key_hash % their_include_pattern % their_exclude_pattern
2153 % (role == source_and_sink_role ? _("source and sink") :
2154 (role == source_role ? _("source") : _("sink")))
2155 % hnonce1);
2156
2157 set_session_key(hmac_key_encrypted);
2158
2159 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2160 client, nonce1, signature))
2161 return false;
2162 queue_confirm_cmd();
2163 return true;
2164 }
2165 break;
2166
2167 case confirm_cmd:
2168 require(! authenticated, "confirm netcmd received when not authenticated");
2169 require(voice == client_voice, "confirm netcmd received in client voice");
2170 {
2171 string signature;
2172 cmd.read_confirm_cmd();
2173 this->authenticated = true;
2174 respond_to_confirm_cmd();
2175 return true;
2176 }
2177 break;
2178
2179 case refine_cmd:
2180 require(authenticated, "refine netcmd received when authenticated");
2181 {
2182 merkle_node node;
2183 refinement_type ty;
2184 cmd.read_refine_cmd(ty, node);
2185 return process_refine_cmd(ty, node);
2186 }
2187 break;
2188
2189 case done_cmd:
2190 require(authenticated, "done netcmd received when not authenticated");
2191 {
2192 size_t n_items;
2193 netcmd_item_type type;
2194 cmd.read_done_cmd(type, n_items);
2195 return process_done_cmd(type, n_items);
2196 }
2197 break;
2198
2199 case data_cmd:
2200 require(authenticated, "data netcmd received when not authenticated");
2201 require(role == sink_role ||
2202 role == source_and_sink_role,
2203 "data netcmd received in source or source/sink role");
2204 {
2205 netcmd_item_type type;
2206 id item;
2207 string dat;
2208 cmd.read_data_cmd(type, item, dat);
2209 return process_data_cmd(type, item, dat);
2210 }
2211 break;
2212
2213 case delta_cmd:
2214 require(authenticated, "delta netcmd received when not authenticated");
2215 require(role == sink_role ||
2216 role == source_and_sink_role,
2217 "delta netcmd received in source or source/sink role");
2218 {
2219 netcmd_item_type type;
2220 id base, ident;
2221 delta del;
2222 cmd.read_delta_cmd(type, base, ident, del);
2223 return process_delta_cmd(type, base, ident, del);
2224 }
2225 break;
2226
2227 case usher_cmd:
2228 {
2229 utf8 greeting;
2230 cmd.read_usher_cmd(greeting);
2231 return process_usher_cmd(greeting);
2232 }
2233 break;
2234
2235 case usher_reply_cmd:
2236 return false; // Should not happen.
2237 break;
2238 }
2239 return false;
2240}
2241
2242// This kicks off the whole cascade starting from "hello".
2243void
2244session::begin_service()
2245{
2246 keypair kp;
2247 if (app.opts.use_transport_auth)
2248 app.keys.get_key_pair(app.opts.signing_key, kp);
2249 queue_hello_cmd(app.opts.signing_key, kp.pub, mk_nonce());
2250}
2251
2252void
2253session::maybe_step()
2254{
2255 while (done_all_refinements()
2256 && !rev_enumerator.done()
2257 && outbuf_size < constants::bufsz * 10)
2258 {
2259 rev_enumerator.step();
2260 }
2261}
2262
2263void
2264session::maybe_say_goodbye(transaction_guard & guard)
2265{
2266 if (voice == client_voice
2267 && protocol_state == working_state
2268 && finished_working())
2269 {
2270 protocol_state = shutdown_state;
2271 guard.do_checkpoint();
2272 queue_bye_cmd(0);
2273 }
2274}
2275
2276bool
2277session::arm()
2278{
2279 if (!armed)
2280 {
2281 // Don't pack the buffer unnecessarily.
2282 if (outbuf_size > constants::bufsz * 10)
2283 return false;
2284
2285 if (cmd.read(inbuf, read_hmac))
2286 {
2287 armed = true;
2288 }
2289 }
2290 return armed;
2291}
2292
2293bool session::process(transaction_guard & guard)
2294{
2295 if (encountered_error)
2296 return true;
2297 try
2298 {
2299 if (!arm())
2300 return true;
2301
2302 armed = false;
2303 L(FL("processing %d byte input buffer from peer %s")
2304 % inbuf.size() % peer_id);
2305
2306 size_t sz = cmd.encoded_size();
2307 bool ret = dispatch_payload(cmd, guard);
2308
2309 if (inbuf.size() >= constants::netcmd_maxsz)
2310 W(F("input buffer for peer %s is overfull "
2311 "after netcmd dispatch") % peer_id);
2312
2313 guard.maybe_checkpoint(sz);
2314
2315 if (!ret)
2316 L(FL("finishing processing with '%d' packet")
2317 % cmd.get_cmd_code());
2318 return ret;
2319 }
2320 catch (bad_decode & bd)
2321 {
2322 W(F("protocol error while processing peer %s: '%s'")
2323 % peer_id % bd.what);
2324 return false;
2325 }
2326 catch (netsync_error & err)
2327 {
2328 W(F("error: %s") % err.msg);
2329 queue_error_cmd(boost::lexical_cast<string>(error_code) + " " + err.msg);
2330 encountered_error = true;
2331 return true; // Don't terminate until we've send the error_cmd.
2332 }
2333}
2334
2335
2336static shared_ptr<Netxx::StreamBase>
2337build_stream_to_server(app_state & app,
2338 globish const & include_pattern,
2339 globish const & exclude_pattern,
2340 utf8 const & address,
2341 Netxx::port_type default_port,
2342 Netxx::Timeout timeout)
2343{
2344 shared_ptr<Netxx::StreamBase> server;
2345 uri u;
2346 vector<string> argv;
2347
2348 parse_uri(address(), u);
2349 if (app.lua.hook_get_netsync_connect_command(u,
2350 include_pattern,
2351 exclude_pattern,
2352 global_sanity.debug_p(),
2353 argv))
2354 {
2355 I(argv.size() > 0);
2356 string cmd = argv[0];
2357 argv.erase(argv.begin());
2358 app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u);
2359 return shared_ptr<Netxx::StreamBase>
2360 (new Netxx::PipeStream(cmd, argv));
2361
2362 }
2363 else
2364 {
2365#ifdef USE_IPV6
2366 bool use_ipv6=true;
2367#else
2368 bool use_ipv6=false;
2369#endif
2370 Netxx::Address addr(address().c_str(),
2371 default_port, use_ipv6);
2372 return shared_ptr<Netxx::StreamBase>
2373 (new Netxx::Stream(addr, timeout));
2374 }
2375}
2376
2377static void
2378call_server(protocol_role role,
2379 globish const & include_pattern,
2380 globish const & exclude_pattern,
2381 app_state & app,
2382 std::list<utf8> const & addresses,
2383 Netxx::port_type default_port,
2384 unsigned long timeout_seconds)
2385{
2386 Netxx::PipeCompatibleProbe probe;
2387 transaction_guard guard(app.db);
2388 I(addresses.size() == 1);
2389 utf8 address(*addresses.begin());
2390
2391 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2392
2393 P(F("connecting to %s") % address);
2394
2395 shared_ptr<Netxx::StreamBase> server
2396 = build_stream_to_server(app,
2397 include_pattern,
2398 exclude_pattern,
2399 address, default_port,
2400 timeout);
2401
2402
2403 // 'false' here means not to revert changes when the SockOpt
2404 // goes out of scope.
2405 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2406 socket_options.set_non_blocking();
2407
2408 session sess(role, client_voice,
2409 include_pattern,
2410 exclude_pattern,
2411 app, address(), server);
2412
2413 while (true)
2414 {
2415 bool armed = false;
2416 try
2417 {
2418 armed = sess.arm();
2419 }
2420 catch (bad_decode & bd)
2421 {
2422 E(false, F("protocol error while processing peer %s: '%s'")
2423 % sess.peer_id % bd.what);
2424 }
2425
2426 sess.maybe_step();
2427 sess.maybe_say_goodbye(guard);
2428
2429 probe.clear();
2430 probe.add(*(sess.str), sess.which_events());
2431 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
2432 Netxx::Probe::ready_type event = res.second;
2433 Netxx::socket_type fd = res.first;
2434
2435 if (fd == -1 && !armed)
2436 {
2437 E(false, (F("timed out waiting for I/O with "
2438 "peer %s, disconnecting")
2439 % sess.peer_id));
2440 }
2441
2442 bool all_io_clean = (event != Netxx::Probe::ready_oobd);
2443
2444 if (event & Netxx::Probe::ready_read)
2445 all_io_clean = all_io_clean && sess.read_some();
2446
2447 if (event & Netxx::Probe::ready_write)
2448 all_io_clean = all_io_clean && sess.write_some();
2449
2450 if (armed)
2451 if (!sess.process(guard))
2452 {
2453 // Commit whatever work we managed to accomplish anyways.
2454 guard.commit();
2455
2456 // We failed during processing. This should only happen in
2457 // client voice when we have a decode exception, or received an
2458 // error from our server (which is translated to a decode
2459 // exception). We call these cases E() errors.
2460 E(false, F("processing failure while talking to "
2461 "peer %s, disconnecting")
2462 % sess.peer_id);
2463 return;
2464 }
2465
2466 if (!all_io_clean)
2467 {
2468 // Commit whatever work we managed to accomplish anyways.
2469 guard.commit();
2470
2471 // We had an I/O error. We must decide if this represents a
2472 // user-reported error or a clean disconnect. See protocol
2473 // state diagram in session::process_bye_cmd.
2474
2475 if (sess.protocol_state == session::confirmed_state)
2476 {
2477 P(F("successful exchange with %s")
2478 % sess.peer_id);
2479 return;
2480 }
2481 else if (sess.encountered_error)
2482 {
2483 P(F("peer %s disconnected after we informed them of error")
2484 % sess.peer_id);
2485 return;
2486 }
2487 else
2488 E(false, (F("I/O failure while talking to "
2489 "peer %s, disconnecting")
2490 % sess.peer_id));
2491 }
2492 }
2493}
2494
2495static void
2496drop_session_associated_with_fd(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2497 Netxx::socket_type fd)
2498{
2499 // This is a bit of a hack. Initially all "file descriptors" in
2500 // netsync were full duplex, so we could get away with indexing
2501 // sessions by their file descriptor.
2502 //
2503 // When using pipes in unix, it's no longer true: a session gets
2504 // entered in the session map under its read pipe fd *and* its write
2505 // pipe fd. When we're in such a situation the socket fd is "-1" and
2506 // we downcast to a PipeStream and use its read+write fds.
2507 //
2508 // When using pipes in windows, we use a full duplex pipe (named
2509 // pipe) so the socket-like abstraction holds.
2510
2511 I(fd != -1);
2512 map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.find(fd);
2513 I(i != sessions.end());
2514 shared_ptr<session> sess = i->second;
2515 fd = sess->str->get_socketfd();
2516 if (fd != -1)
2517 {
2518 sessions.erase(fd);
2519 }
2520 else
2521 {
2522 shared_ptr<Netxx::PipeStream> pipe =
2523 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2524 I(static_cast<bool>(pipe));
2525 I(pipe->get_writefd() != -1);
2526 I(pipe->get_readfd() != -1);
2527 sessions.erase(pipe->get_readfd());
2528 sessions.erase(pipe->get_writefd());
2529 }
2530}
2531
2532static void
2533arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
2534 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2535 set<Netxx::socket_type> & armed_sessions,
2536 transaction_guard & guard)
2537{
2538 set<Netxx::socket_type> arm_failed;
2539 for (map<Netxx::socket_type,
2540 shared_ptr<session> >::const_iterator i = sessions.begin();
2541 i != sessions.end(); ++i)
2542 {
2543 i->second->maybe_step();
2544 i->second->maybe_say_goodbye(guard);
2545 try
2546 {
2547 if (i->second->arm())
2548 {
2549 L(FL("fd %d is armed") % i->first);
2550 armed_sessions.insert(i->first);
2551 }
2552 probe.add(*i->second->str, i->second->which_events());
2553 }
2554 catch (bad_decode & bd)
2555 {
2556 W(F("protocol error while processing peer %s: '%s', marking as bad")
2557 % i->second->peer_id % bd.what);
2558 arm_failed.insert(i->first);
2559 }
2560 }
2561 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
2562 i != arm_failed.end(); ++i)
2563 {
2564 drop_session_associated_with_fd(sessions, *i);
2565 }
2566}
2567
2568static void
2569handle_new_connection(Netxx::Address & addr,
2570 Netxx::StreamServer & server,
2571 Netxx::Timeout & timeout,
2572 protocol_role role,
2573 globish const & include_pattern,
2574 globish const & exclude_pattern,
2575 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2576 app_state & app)
2577{
2578 L(FL("accepting new connection on %s : %s")
2579 % (addr.get_name()?addr.get_name():"") % lexical_cast<string>(addr.get_port()));
2580 Netxx::Peer client = server.accept_connection();
2581
2582 if (!client)
2583 {
2584 L(FL("accept() returned a dead client"));
2585 }
2586 else
2587 {
2588 P(F("accepted new client connection from %s : %s")
2589 % client.get_address() % lexical_cast<string>(client.get_port()));
2590
2591 // 'false' here means not to revert changes when the SockOpt
2592 // goes out of scope.
2593 Netxx::SockOpt socket_options(client.get_socketfd(), false);
2594 socket_options.set_non_blocking();
2595
2596 shared_ptr<Netxx::Stream> str =
2597 shared_ptr<Netxx::Stream>
2598 (new Netxx::Stream(client.get_socketfd(), timeout));
2599
2600 shared_ptr<session> sess(new session(role, server_voice,
2601 include_pattern, exclude_pattern,
2602 app,
2603 lexical_cast<string>(client), str));
2604 sess->begin_service();
2605 sessions.insert(make_pair(client.get_socketfd(), sess));
2606 }
2607}
2608
2609static void
2610handle_read_available(Netxx::socket_type fd,
2611 shared_ptr<session> sess,
2612 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2613 set<Netxx::socket_type> & armed_sessions,
2614 bool & live_p)
2615{
2616 if (sess->read_some())
2617 {
2618 try
2619 {
2620 if (sess->arm())
2621 armed_sessions.insert(fd);
2622 }
2623 catch (bad_decode & bd)
2624 {
2625 W(F("protocol error while processing peer %s: '%s', disconnecting")
2626 % sess->peer_id % bd.what);
2627 drop_session_associated_with_fd(sessions, fd);
2628 live_p = false;
2629 }
2630 }
2631 else
2632 {
2633 switch (sess->protocol_state)
2634 {
2635 case session::working_state:
2636 P(F("peer %s read failed in working state (error)")
2637 % sess->peer_id);
2638 break;
2639
2640 case session::shutdown_state:
2641 P(F("peer %s read failed in shutdown state "
2642 "(possibly client misreported error)")
2643 % sess->peer_id);
2644 break;
2645
2646 case session::confirmed_state:
2647 P(F("peer %s read failed in confirmed state (success)")
2648 % sess->peer_id);
2649 break;
2650 }
2651 drop_session_associated_with_fd(sessions, fd);
2652 live_p = false;
2653 }
2654}
2655
2656
2657static void
2658handle_write_available(Netxx::socket_type fd,
2659 shared_ptr<session> sess,
2660 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2661 bool & live_p)
2662{
2663 if (!sess->write_some())
2664 {
2665 switch (sess->protocol_state)
2666 {
2667 case session::working_state:
2668 P(F("peer %s write failed in working state (error)")
2669 % sess->peer_id);
2670 break;
2671
2672 case session::shutdown_state:
2673 P(F("peer %s write failed in shutdown state "
2674 "(possibly client misreported error)")
2675 % sess->peer_id);
2676 break;
2677
2678 case session::confirmed_state:
2679 P(F("peer %s write failed in confirmed state (success)")
2680 % sess->peer_id);
2681 break;
2682 }
2683
2684 drop_session_associated_with_fd(sessions, fd);
2685 live_p = false;
2686 }
2687}
2688
2689static void
2690process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2691 set<Netxx::socket_type> & armed_sessions,
2692 transaction_guard & guard)
2693{
2694 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
2695 i != armed_sessions.end(); ++i)
2696 {
2697 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
2698 j = sessions.find(*i);
2699 if (j == sessions.end())
2700 continue;
2701 else
2702 {
2703 shared_ptr<session> sess = j->second;
2704 if (!sess->process(guard))
2705 {
2706 P(F("peer %s processing finished, disconnecting")
2707 % sess->peer_id);
2708 drop_session_associated_with_fd(sessions, *i);
2709 }
2710 }
2711 }
2712}
2713
2714static void
2715reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2716 unsigned long timeout_seconds)
2717{
2718 // Kill any clients which haven't done any i/o inside the timeout period
2719 // or who have exchanged all items and flushed their output buffers.
2720 set<Netxx::socket_type> dead_clients;
2721 time_t now = ::time(NULL);
2722 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator
2723 i = sessions.begin(); i != sessions.end(); ++i)
2724 {
2725 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
2726 < static_cast<unsigned long>(now))
2727 {
2728 P(F("fd %d (peer %s) has been idle too long, disconnecting")
2729 % i->first % i->second->peer_id);
2730 dead_clients.insert(i->first);
2731 }
2732 }
2733 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
2734 i != dead_clients.end(); ++i)
2735 {
2736 drop_session_associated_with_fd(sessions, *i);
2737 }
2738}
2739
2740static void
2741serve_connections(protocol_role role,
2742 globish const & include_pattern,
2743 globish const & exclude_pattern,
2744 app_state & app,
2745 std::list<utf8> const & addresses,
2746 Netxx::port_type default_port,
2747 unsigned long timeout_seconds,
2748 unsigned long session_limit)
2749{
2750 Netxx::PipeCompatibleProbe probe;
2751
2752 Netxx::Timeout
2753 forever,
2754 timeout(static_cast<long>(timeout_seconds)),
2755 instant(0,1);
2756
2757#ifdef USE_IPV6
2758 bool use_ipv6=true;
2759#else
2760 bool use_ipv6=false;
2761#endif
2762 // This will be true when we try to bind while using IPv6. See comments
2763 // further down.
2764 bool try_again=false;
2765
2766 do
2767 {
2768 try
2769 {
2770 try_again = false;
2771
2772 Netxx::Address addr(use_ipv6);
2773
2774 if (addresses.empty())
2775 addr.add_all_addresses(default_port);
2776 else
2777 {
2778 for (std::list<utf8>::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
2779 {
2780 const utf8 & address = *it;
2781 if (!address().empty())
2782 {
2783 size_t l_colon = address().find(':');
2784 size_t r_colon = address().rfind(':');
2785
2786 if (l_colon == r_colon && l_colon == 0)
2787 {
2788 // can't be an IPv6 address as there is only one colon
2789 // must be a : followed by a port
2790 string port_str = address().substr(1);
2791 addr.add_all_addresses(std::atoi(port_str.c_str()));
2792 }
2793 else
2794 addr.add_address(address().c_str(), default_port);
2795 }
2796 }
2797 }
2798
2799 // If se use IPv6 and the initialisation of server fails, we want
2800 // to try again with IPv4. The reason is that someone may have
2801 // downloaded a IPv6-enabled monotone on a system that doesn't
2802 // have IPv6, and which might fail therefore.
2803 // On failure, Netxx::NetworkException is thrown, and we catch
2804 // it further down.
2805 try_again=use_ipv6;
2806
2807 Netxx::StreamServer server(addr, timeout);
2808
2809 // If we came this far, whatever we used (IPv6 or IPv4) was
2810 // accepted, so we don't need to try again any more.
2811 try_again=false;
2812
2813 const char *name = addr.get_name();
2814 P(F("beginning service on %s : %s")
2815 % (name != NULL ? name : _("<all interfaces>"))
2816 % lexical_cast<string>(addr.get_port()));
2817
2818 map<Netxx::socket_type, shared_ptr<session> > sessions;
2819 set<Netxx::socket_type> armed_sessions;
2820
2821 shared_ptr<transaction_guard> guard;
2822
2823 while (true)
2824 {
2825 probe.clear();
2826 armed_sessions.clear();
2827
2828 if (sessions.size() >= session_limit)
2829 W(F("session limit %d reached, some connections "
2830 "will be refused") % session_limit);
2831 else
2832 probe.add(server);
2833
2834 if (!guard)
2835 guard = shared_ptr<transaction_guard>(new transaction_guard(app.db));
2836
2837 I(guard);
2838
2839 while (!server_initiated_sync_requests.empty())
2840 {
2841 server_initiated_sync_request request
2842 = server_initiated_sync_requests.front();
2843 server_initiated_sync_requests.pop_front();
2844
2845 utf8 addr(request.address);
2846 globish inc(request.include);
2847 globish exc(request.exclude);
2848
2849 try
2850 {
2851 P(F("connecting to %s") % addr());
2852 shared_ptr<Netxx::StreamBase> server
2853 = build_stream_to_server(app, inc, exc,
2854 addr, default_port,
2855 timeout);
2856
2857 // 'false' here means not to revert changes when
2858 // the SockOpt goes out of scope.
2859 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2860 socket_options.set_non_blocking();
2861
2862 protocol_role role = source_and_sink_role;
2863 if (request.what == "sync")
2864 role = source_and_sink_role;
2865 else if (request.what == "push")
2866 role = source_role;
2867 else if (request.what == "pull")
2868 role = sink_role;
2869
2870 shared_ptr<session> sess(new session(role, client_voice,
2871 inc, exc,
2872 app, addr(), server, true));
2873
2874 sessions.insert(make_pair(server->get_socketfd(), sess));
2875 }
2876 catch (Netxx::NetworkException & e)
2877 {
2878 P(F("Network error: %s") % e.what());
2879 }
2880 }
2881
2882 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, *guard);
2883
2884 L(FL("i/o probe with %d armed") % armed_sessions.size());
2885 Netxx::socket_type fd;
2886 Netxx::Timeout how_long;
2887 if (sessions.empty())
2888 how_long = forever;
2889 else if (armed_sessions.empty())
2890 how_long = timeout;
2891 else
2892 how_long = instant;
2893 do
2894 {
2895 Netxx::Probe::result_type res = probe.ready(how_long);
2896 how_long = instant;
2897 Netxx::Probe::ready_type event = res.second;
2898 fd = res.first;
2899
2900 if (fd == -1)
2901 {
2902 if (armed_sessions.empty())
2903 L(FL("timed out waiting for I/O (listening on %s : %s)")
2904 % addr.get_name() % lexical_cast<string>(addr.get_port()));
2905 }
2906
2907 // we either got a new connection
2908 else if (fd == server)
2909 handle_new_connection(addr, server, timeout, role,
2910 include_pattern, exclude_pattern,
2911 sessions, app);
2912
2913 // or an existing session woke up
2914 else
2915 {
2916 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
2917 i = sessions.find(fd);
2918 if (i == sessions.end())
2919 {
2920 L(FL("got woken up for action on unknown fd %d") % fd);
2921 }
2922 else
2923 {
2924 probe.remove(*(i->second->str));
2925 shared_ptr<session> sess = i->second;
2926 bool live_p = true;
2927
2928 try
2929 {
2930 if (event & Netxx::Probe::ready_read)
2931 handle_read_available(fd, sess, sessions,
2932 armed_sessions, live_p);
2933
2934 if (live_p && (event & Netxx::Probe::ready_write))
2935 handle_write_available(fd, sess, sessions, live_p);
2936 }
2937 catch (Netxx::Exception &)
2938 {
2939 P(F("Network error on peer %s, disconnecting")
2940 % sess->peer_id);
2941 drop_session_associated_with_fd(sessions, fd);
2942 }
2943 if (live_p && (event & Netxx::Probe::ready_oobd))
2944 {
2945 P(F("got OOB from peer %s, disconnecting")
2946 % sess->peer_id);
2947 drop_session_associated_with_fd(sessions, fd);
2948 }
2949 }
2950 }
2951 }
2952 while (fd != -1);
2953 process_armed_sessions(sessions, armed_sessions, *guard);
2954 reap_dead_sessions(sessions, timeout_seconds);
2955
2956 if (sessions.empty())
2957 {
2958 // Let the guard die completely if everything's gone quiet.
2959 guard->commit();
2960 guard.reset();
2961 }
2962 }
2963 }
2964 // This exception is thrown when bind() fails somewhere in Netxx.
2965 catch (Netxx::NetworkException &)
2966 {
2967 // If we tried with IPv6 and failed, we want to try again using IPv4.
2968 if (try_again)
2969 {
2970 use_ipv6 = false;
2971 }
2972 // In all other cases, just rethrow the exception.
2973 else
2974 throw;
2975 }
2976 // This exception is thrown when there is no support for the type of
2977 // connection we want to do in the kernel, for example when a socket()
2978 // call fails somewhere in Netxx.
2979 catch (Netxx::Exception &)
2980 {
2981 // If we tried with IPv6 and failed, we want to try again using IPv4.
2982 if (try_again)
2983 {
2984 use_ipv6 = false;
2985 }
2986 // In all other cases, just rethrow the exception.
2987 else
2988 throw;
2989 }
2990 }
2991 while(try_again);
2992 }
2993
2994static void
2995serve_single_connection(shared_ptr<session> sess,
2996 unsigned long timeout_seconds)
2997{
2998 Netxx::PipeCompatibleProbe probe;
2999
3000 Netxx::Timeout
3001 forever,
3002 timeout(static_cast<long>(timeout_seconds)),
3003 instant(0,1);
3004
3005 P(F("beginning service on %s") % sess->peer_id);
3006
3007 sess->begin_service();
3008
3009 transaction_guard guard(sess->app.db);
3010
3011 map<Netxx::socket_type, shared_ptr<session> > sessions;
3012 set<Netxx::socket_type> armed_sessions;
3013
3014 if (sess->str->get_socketfd() == -1)
3015 {
3016 // Unix pipes are non-duplex, have two filedescriptors
3017 shared_ptr<Netxx::PipeStream> pipe =
3018 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
3019 I(pipe);
3020 sessions[pipe->get_writefd()]=sess;
3021 sessions[pipe->get_readfd()]=sess;
3022 }
3023 else
3024 sessions[sess->str->get_socketfd()]=sess;
3025
3026 while (!sessions.empty())
3027 {
3028 probe.clear();
3029 armed_sessions.clear();
3030
3031 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, guard);
3032
3033 L(FL("i/o probe with %d armed") % armed_sessions.size());
3034 Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout
3035 : instant));
3036 Netxx::Probe::ready_type event = res.second;
3037 Netxx::socket_type fd = res.first;
3038
3039 if (fd == -1)
3040 {
3041 if (armed_sessions.empty())
3042 L(FL("timed out waiting for I/O (listening on %s)")
3043 % sess->peer_id);
3044 }
3045
3046 // an existing session woke up
3047 else
3048 {
3049 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3050 i = sessions.find(fd);
3051 if (i == sessions.end())
3052 {
3053 L(FL("got woken up for action on unknown fd %d") % fd);
3054 }
3055 else
3056 {
3057 shared_ptr<session> sess = i->second;
3058 bool live_p = true;
3059
3060 if (event & Netxx::Probe::ready_read)
3061 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
3062
3063 if (live_p && (event & Netxx::Probe::ready_write))
3064 handle_write_available(fd, sess, sessions, live_p);
3065
3066 if (live_p && (event & Netxx::Probe::ready_oobd))
3067 {
3068 P(F("got some OOB data on fd %d (peer %s), disconnecting")
3069 % fd % sess->peer_id);
3070 drop_session_associated_with_fd(sessions, fd);
3071 }
3072 }
3073 }
3074 process_armed_sessions(sessions, armed_sessions, guard);
3075 reap_dead_sessions(sessions, timeout_seconds);
3076 }
3077}
3078
3079
3080void
3081insert_with_parents(revision_id rev,
3082 refiner & ref,
3083 revision_enumerator & rev_enumerator,
3084 set<revision_id> & revs,
3085 app_state & app,
3086 ticker & revisions_ticker)
3087{
3088 deque<revision_id> work;
3089 work.push_back(rev);
3090 while (!work.empty())
3091 {
3092 revision_id rid = work.front();
3093 work.pop_front();
3094
3095 if (!null_id(rid) && revs.find(rid) == revs.end())
3096 {
3097 revs.insert(rid);
3098 ++revisions_ticker;
3099 id rev_item;
3100 decode_hexenc(rid.inner(), rev_item);
3101 ref.note_local_item(rev_item);
3102 vector<revision_id> parents;
3103 rev_enumerator.get_revision_parents(rid, parents);
3104 for (vector<revision_id>::const_iterator i = parents.begin();
3105 i != parents.end(); ++i)
3106 {
3107 work.push_back(*i);
3108 }
3109 }
3110 }
3111}
3112
3113void
3114session::rebuild_merkle_trees(app_state & app,
3115 set<branch_name> const & branchnames)
3116{
3117 P(F("finding items to synchronize:"));
3118 for (set<branch_name>::const_iterator i = branchnames.begin();
3119 i != branchnames.end(); ++i)
3120 L(FL("including branch %s") % *i);
3121
3122 // xgettext: please use short message and try to avoid multibytes chars
3123 ticker revisions_ticker(N_("revisions"), "r", 64);
3124 // xgettext: please use short message and try to avoid multibytes chars
3125 ticker certs_ticker(N_("certificates"), "c", 256);
3126 // xgettext: please use short message and try to avoid multibytes chars
3127 ticker keys_ticker(N_("keys"), "k", 1);
3128
3129 set<revision_id> revision_ids;
3130 set<rsa_keypair_id> inserted_keys;
3131
3132 {
3133 for (set<branch_name>::const_iterator i = branchnames.begin();
3134 i != branchnames.end(); ++i)
3135 {
3136 // Get branch certs.
3137 vector< revision<cert> > certs;
3138 // FIXME_PROJECTS: probably something like
3139 // app.get_project(i->project).get_branch_certs(i->branch)
3140 // or so.
3141 app.get_project().get_branch_certs(*i, certs);
3142 for (vector< revision<cert> >::const_iterator j = certs.begin();
3143 j != certs.end(); j++)
3144 {
3145 revision_id rid(j->inner().ident);
3146 insert_with_parents(rid, rev_refiner, rev_enumerator,
3147 revision_ids, app, revisions_ticker);
3148 // Branch certs go in here, others later on.
3149 hexenc<id> tmp;
3150 id item;
3151 cert_hash_code(j->inner(), tmp);
3152 decode_hexenc(tmp, item);
3153 cert_refiner.note_local_item(item);
3154 rev_enumerator.note_cert(rid, tmp);
3155 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3156 inserted_keys.insert(j->inner().key);
3157 }
3158 }
3159 }
3160
3161 {
3162 map<branch_name, epoch_data> epochs;
3163 app.db.get_epochs(epochs);
3164
3165 epoch_data epoch_zero(string(constants::epochlen, '0'));
3166 for (set<branch_name>::const_iterator i = branchnames.begin();
3167 i != branchnames.end(); ++i)
3168 {
3169 branch_name const & branch(*i);
3170 map<branch_name, epoch_data>::const_iterator j;
3171 j = epochs.find(branch);
3172
3173 // Set to zero any epoch which is not yet set.
3174 if (j == epochs.end())
3175 {
3176 L(FL("setting epoch on %s to zero") % branch);
3177 epochs.insert(make_pair(branch, epoch_zero));
3178 app.db.set_epoch(branch, epoch_zero);
3179 }
3180
3181 // Then insert all epochs into merkle tree.
3182 j = epochs.find(branch);
3183 I(j != epochs.end());
3184 epoch_id eid;
3185 id epoch_item;
3186 epoch_hash_code(j->first, j->second, eid);
3187 decode_hexenc(eid.inner(), epoch_item);
3188 epoch_refiner.note_local_item(epoch_item);
3189 }
3190 }
3191
3192 {
3193 typedef vector< pair<hexenc<id>,
3194 pair<revision_id, rsa_keypair_id> > > cert_idx;
3195
3196 cert_idx idx;
3197 app.db.get_revision_cert_nobranch_index(idx);
3198
3199 // Insert all non-branch certs reachable via these revisions
3200 // (branch certs were inserted earlier).
3201
3202 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3203 {
3204 hexenc<id> const & hash = i->first;
3205 revision_id const & ident = i->second.first;
3206 rsa_keypair_id const & key = i->second.second;
3207
3208 rev_enumerator.note_cert(ident, hash);
3209
3210 if (revision_ids.find(ident) == revision_ids.end())
3211 continue;
3212
3213 id item;
3214 decode_hexenc(hash, item);
3215 cert_refiner.note_local_item(item);
3216 ++certs_ticker;
3217 if (inserted_keys.find(key) == inserted_keys.end())
3218 inserted_keys.insert(key);
3219 }
3220 }
3221
3222 // Add any keys specified on the command line.
3223 for (vector<rsa_keypair_id>::const_iterator key
3224 = app.opts.keys_to_push.begin();
3225 key != app.opts.keys_to_push.end(); ++key)
3226 {
3227 if (inserted_keys.find(*key) == inserted_keys.end())
3228 {
3229 if (!app.db.public_key_exists(*key))
3230 {
3231 if (app.keys.key_pair_exists(*key))
3232 app.keys.ensure_in_database(*key);
3233 else
3234 W(F("Cannot find key '%s'") % *key);
3235 }
3236 inserted_keys.insert(*key);
3237 }
3238 }
3239
3240 // Insert all the keys.
3241 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3242 key != inserted_keys.end(); key++)
3243 {
3244 if (app.db.public_key_exists(*key))
3245 {
3246 base64<rsa_pub_key> pub_encoded;
3247 app.db.get_key(*key, pub_encoded);
3248 hexenc<id> keyhash;
3249 key_hash_code(*key, pub_encoded, keyhash);
3250 L(FL("noting key '%s' = '%s' to send") % *key % keyhash);
3251 id key_item;
3252 decode_hexenc(keyhash, key_item);
3253 key_refiner.note_local_item(key_item);
3254 ++keys_ticker;
3255 }
3256 }
3257
3258 rev_refiner.reindex_local_items();
3259 cert_refiner.reindex_local_items();
3260 key_refiner.reindex_local_items();
3261 epoch_refiner.reindex_local_items();
3262}
3263
3264void
3265run_netsync_protocol(protocol_voice voice,
3266 protocol_role role,
3267 std::list<utf8> const & addrs,
3268 globish const & include_pattern,
3269 globish const & exclude_pattern,
3270 app_state & app)
3271{
3272 if (include_pattern().find_first_of("'\"") != string::npos)
3273 {
3274 W(F("include branch pattern contains a quote character:\n"
3275 "%s") % include_pattern());
3276 }
3277
3278 if (exclude_pattern().find_first_of("'\"") != string::npos)
3279 {
3280 W(F("exclude branch pattern contains a quote character:\n"
3281 "%s") % exclude_pattern());
3282 }
3283
3284 // We do not want to be killed by SIGPIPE from a network disconnect.
3285 ignore_sigpipe();
3286
3287 try
3288 {
3289 if (voice == server_voice)
3290 {
3291 if (app.opts.bind_stdio)
3292 {
3293 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
3294 shared_ptr<session> sess(new session(role, server_voice,
3295 include_pattern, exclude_pattern,
3296 app, "stdio", str));
3297 serve_single_connection(sess,constants::netsync_timeout_seconds);
3298 }
3299 else
3300 serve_connections(role, include_pattern, exclude_pattern, app,
3301 addrs, static_cast<Netxx::port_type>(constants::netsync_default_port),
3302 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3303 static_cast<unsigned long>(constants::netsync_connection_limit));
3304 }
3305 else
3306 {
3307 I(voice == client_voice);
3308 call_server(role, include_pattern, exclude_pattern, app,
3309 addrs, static_cast<Netxx::port_type>(constants::netsync_default_port),
3310 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3311 }
3312 }
3313 catch (Netxx::NetworkException & e)
3314 {
3315 throw informative_failure((F("network error: %s") % e.what()).str());
3316 }
3317 catch (Netxx::Exception & e)
3318 {
3319 throw oops((F("network error: %s") % e.what()).str());;
3320 }
3321}
3322
3323// Local Variables:
3324// mode: C++
3325// fill-column: 76
3326// c-file-style: "gnu"
3327// indent-tabs-mode: nil
3328// End:
3329// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status