WvStreams
wvhttpstream.cc
1 /*
2  * Worldvisions Weaver Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
6  *
7  * See wvhttppool.h.
8  */
9 #include "wvhttppool.h"
10 #include "wvtcp.h"
11 #include "wvsslstream.h"
12 #include "wvbuf.h"
13 #include "wvbase64.h"
14 #include "strutils.h"
15 #ifdef HAVE_EXECINFO_H
16 #include <execinfo.h> // FIXME: add a WvCrash feature for explicit dumps
17 #endif
18 
19 #ifdef _WIN32
20 #define ETIMEDOUT WSAETIMEDOUT
21 #endif
22 
23 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
24  bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
25  : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
26  pipeline_incompatible(_pipeline_incompatible),
27  in_doneurl(false)
28 {
29  log("Opening server connection.\n");
30  http_response = "";
31  encoding = Unknown;
32  bytes_remaining = 0;
33  in_chunk_trailer = false;
34  pipeline_test_count = 0;
35  last_was_pipeline_test = false;
36 
37  enable_pipelining = global_enable_pipelining
38  && !pipeline_incompatible[target.remaddr];
39  ssl = _ssl;
40 
41  if (ssl)
42  cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
43 
44  sent_url_request = false;
45 
46  alarm(60000); // timeout if no connection, or something goes wrong
47 }
48 
49 
50 WvHttpStream::~WvHttpStream()
51 {
52  log(WvLog::Debug2, "Deleting.\n");
53 
54 #if 0
55 #ifdef HAVE_EXECINFO_H
56  void* trace[10];
57  int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
58  char** tracedump = backtrace_symbols(trace, count);
59  log(WvLog::Debug, "TRACE");
60  for (int i = 0; i < count; ++i)
61  log(WvLog::Debug, ":%s", tracedump[i]);
62  log(WvLog::Debug, "\n");
63  free(tracedump);
64 #endif
65 #endif
66 
67  if (geterr())
68  log("Error was: %s\n", errstr());
69  close();
70 }
71 
72 
74 {
75  log("close called\n");
76 
77 #if 0
78 #ifdef HAVE_EXECINFO_H
79  void *trace[10];
80  int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
81  char** tracedump = backtrace_symbols(trace, count);
82  log(WvLog::Debug, "TRACE");
83  for (int i = 0; i < count; ++i)
84  log(WvLog::Debug, ":%s", tracedump[i]);
85  log(WvLog::Debug, "\n");
86  free(tracedump);
87 #endif
88 #endif
89 
90  // assume pipelining is broken if we're closing without doing at least
91  // one successful pipelining test and a following non-test request.
92  if (enable_pipelining && max_requests > 1
93  && (pipeline_test_count < 1
94  || (pipeline_test_count == 1 && last_was_pipeline_test)))
95  pipelining_is_broken(2);
96 
97  if (isok())
98  log("Closing.\n");
100 
101  if (geterr())
102  {
103  // if there was an error, count the first URL as done. This prevents
104  // retrying indefinitely.
105  WvUrlRequest *msgurl = curl;
106  if (!msgurl && !urls.isempty())
107  msgurl = urls.first();
108  if (!msgurl && !waiting_urls.isempty())
109  msgurl = waiting_urls.first();
110 
111  if (msgurl)
112  {
113  log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(),
114  errstr());
115  curl = msgurl;
116  doneurl();
117  }
118  }
119  waiting_urls.zap();
120  if (curl)
121  {
122  log("curl is %s\n", curl->url);
123  doneurl();
124  }
125  log("close done\n");
126 }
127 
128 
129 void WvHttpStream::doneurl()
130 {
131  // There is a slight chance that we might receive an error during or just before
132  // this function is called, which means that the write occuring during
133  // start_pipeline_test() would be called, which would call close() because of the
134  // error, which would call doneurl() again. We don't want to execute doneurl()
135  // a second time when we're already in the middle.
136  if (in_doneurl)
137  return;
138  in_doneurl = true;
139 
140  assert(curl != NULL);
141  WvString last_response(http_response);
142  log("Done URL: %s\n", curl->url);
143 
144  http_response = "";
145  encoding = Unknown;
146  in_chunk_trailer = false;
147  bytes_remaining = 0;
148 
149  last_was_pipeline_test = curl->pipeline_test;
150  bool broken = false;
151  if (last_was_pipeline_test)
152  {
153  pipeline_test_count++;
154  if (pipeline_test_count == 1)
155  start_pipeline_test(&curl->url);
156  else if (pipeline_test_response != last_response)
157  {
158  // getting a bit late in the game to be detecting brokenness :(
159  // However, if the response code isn't the same for both tests,
160  // something's definitely screwy.
161  pipelining_is_broken(4);
162  broken = true;
163  }
164  pipeline_test_response = last_response;
165  }
166 
167  assert(curl == urls.first());
168  curl->done();
169  curl = NULL;
170  sent_url_request = false;
171  urls.unlink_first();
172 
173  if (broken)
174  close();
175 
176  request_next();
177  in_doneurl = false;
178 }
179 
180 
181 static WvString encode64(WvStringParm user, WvStringParm password)
182 {
183  WvBase64Encoder encoder;
184  WvString ret;
185  encoder.flushstrstr(WvString("%s:%s", user, password), ret);
186  return ret;
187 }
188 
189 
190 static WvString fixnl(WvStringParm nonl)
191 {
192  WvDynBuf b;
193  const char *cptr;
194 
195  for (cptr = nonl; cptr && *cptr; cptr++)
196  {
197  if (*cptr == '\r')
198  continue;
199  else if (*cptr == '\n')
200  b.put("\r", 1); // put BOTH \r and \n
201  b.put(cptr, 1);
202  }
203 
204  return b.getstr();
205 }
206 
207 
208 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
209 {
210  WvString request;
211  WvString auth("");
212  if(!!url->url.getuser() && !!url->url.getpassword())
213  auth = WvString("Authorization: Basic %s\n",
214  encode64(url->url.getuser(), url->url.getpassword()));
215 
216  request = fixnl(
217  WvString(
218  "%s %s HTTP/1.1\n"
219  "Host: %s:%s\n"
220  "Connection: %s\n"
221  "%s"
222  "%s"
223  "%s%s"
224  "\n",
225  url->method,
226  url->url.getfile(),
227  url->url.gethost(), url->url.getport(),
228  keepalive ? "keep-alive" : "close",
229  auth,
230  (putstream_data.used() > 0 ? WvString(
231  "Content-Length: %s\n", putstream_data.used()) : ""),
232  trim_string(url->headers.edit()),
233  !!url->headers ? "\n" : ""));
234  return request;
235 }
236 
237 
238 void WvHttpStream::send_request(WvUrlRequest *url)
239 {
240  request_count++;
241  log("Request #%s: %s\n", request_count, url->url);
242  write(request_str(url, url->pipeline_test
243  || request_count < max_requests));
244  write(putstream_data);
245  sent_url_request = true;
246  alarm(60000);
247 }
248 
249 
250 void WvHttpStream::start_pipeline_test(WvUrl *url)
251 {
252  WvUrl location(WvString(
253  "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
254  url->getproto(), url->gethost(), url->getport()));
255  WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
256  false, true);
257  testurl->instream = this;
258  send_request(testurl);
259  urls.append(testurl, true, "sent_running_url");
260 }
261 
262 
263 void WvHttpStream::request_next()
264 {
265  // Clear the putstream buffer before we start any new requests
266  putstream_data.zap();
267 
268  // don't do a request if we've done too many already or we have none
269  // waiting.
270  if (request_count >= max_requests || waiting_urls.isempty())
271  return;
272 
273  // don't do more than one request at a time if we're not pipelining.
274  if (!enable_pipelining && !urls.isempty())
275  return;
276 
277  // okay then, we really do want to send a new request.
278  WvUrlRequest *url = waiting_urls.first();
279 
280  waiting_urls.unlink_first();
281  if (!url->putstream)
282  {
283  if (enable_pipelining && !request_count && max_requests > 1)
284  start_pipeline_test(&url->url);
285  send_request(url);
286  }
287  urls.append(url, false, "sent_running_url");
288 }
289 
290 
291 void WvHttpStream::pipelining_is_broken(int why)
292 {
293  if (!pipeline_incompatible[target.remaddr])
294  {
295  pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
296  log("Pipelining is broken on this server (%s)! Disabling.\n", why);
297  }
298 }
299 
300 
302 {
303  SelectRequest oldwant = si.wants;
304  WvUrlRequest *url;
305 
307 
308  if (!urls.isempty())
309  {
310  url = urls.first();
311  if(url && url->putstream)
312  url->putstream->pre_select(si);
313  }
314 
315  si.wants = oldwant;
316 }
317 
318 
320 {
321  SelectRequest oldwant = si.wants;
322  WvUrlRequest *url;
323 
324  if (WvUrlStream::post_select(si))
325  return true;
326 
327  if (!urls.isempty())
328  {
329  url = urls.first();
330  if(url && url->putstream && url->putstream->post_select(si))
331  return true;
332  }
333 
334  si.wants = oldwant;
335  return false;
336 }
337 
338 
340 {
341  char buf[1024], *line;
342  size_t len;
343 
345 
346  // make connections timeout after some idleness
347  if (alarm_was_ticking)
348  {
349  log(WvLog::Debug4, "urls count: %s\n", urls.count());
350  if (!urls.isempty())
351  {
352  seterr(ETIMEDOUT);
353 
354  // Must check again here since seterr()
355  // will close our stream and if we only
356  // had one url then it'll be gone.
357  if (!urls.isempty())
358  {
359  WvUrlRequest *url = urls.first();
360  if (url->outstream)
361  url->outstream->seterr(ETIMEDOUT);
362  }
363  }
364  else
365  close(); // timed out, but not really an error
366  return;
367  }
368 
369  // Die if somebody closed our outstream. This is so that if we were
370  // downloading a really big file, they can stop it in the middle and
371  // our next url request can start downloading immediately.
372  if (curl && !curl->outstream)
373  {
374  if (!(encoding == PostHeadInfinity
375  || encoding == PostHeadChunked
376  || encoding == PostHeadStream))
377  {
378  // don't complain about pipelining failures
379  pipeline_test_count++;
380  last_was_pipeline_test = false;
381  close();
382  }
383 
384  if (curl)
385  doneurl();
386  return;
387  }
388  else if (curl)
389  curl->inuse = true;
390 
391  if(!sent_url_request && !urls.isempty())
392  {
393  WvUrlRequest *url = urls.first();
394  if(url)
395  {
396  if(url->putstream)
397  {
398  int len = 0;
399  if(url->putstream->isok())
400  len = url->putstream->read(putstream_data, 1024);
401 
402  if(!url->putstream->isok() || len == 0)
403  {
404  url->putstream = NULL;
405  send_request(url);
406  }
407  }
408  }
409  }
410 
411  if (!curl)
412  {
413  // in the header section
414  line = getline();
415  if (line)
416  {
417  line = trim_string(line);
418  log(WvLog::Debug4, "Header: '%s'\n", line);
419  if (!http_response)
420  {
421  http_response = line;
422 
423  // there are never two pipeline test requests in a row, so
424  // a second response string exactly like the pipeline test
425  // response implies that everything between the first and
426  // second test requests was lost: bad!
427  if (last_was_pipeline_test
428  && http_response == pipeline_test_response)
429  {
430  pipelining_is_broken(1);
431  close();
432  return;
433  }
434 
435  // http response #400 is "invalid request", which we
436  // shouldn't be sending. If we get one of these right after
437  // a test, it probably means the stuff that came after it
438  // was mangled in some way during transmission ...and we
439  // should throw it away.
440  if (last_was_pipeline_test && !!http_response)
441  {
442  const char *cptr = strchr(http_response, ' ');
443  if (cptr && atoi(cptr+1) == 400)
444  {
445  pipelining_is_broken(3);
446  close();
447  return;
448  }
449  }
450  }
451 
452  if (urls.isempty())
453  {
454  log("got unsolicited data.\n");
455  seterr("unsolicited data from server!");
456  return;
457  }
458 
459  if (!strncasecmp(line, "Content-length: ", 16))
460  {
461  bytes_remaining = atoi(line+16);
462  encoding = ContentLength;
463  }
464  else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
465  && strstr(line+19, "chunked"))
466  {
467  encoding = Chunked;
468  }
469 
470  if (line[0])
471  {
472  char *p;
473  WvBufUrlStream *outstream = urls.first()->outstream;
474 
475  if ((p = strchr(line, ':')) != NULL)
476  {
477  *p = 0;
478  p = trim_string(p+1);
479  if (outstream) {
480  struct WvHTTPHeader *h;
481  h = new struct WvHTTPHeader(line, p);
482  outstream->headers.add(h, true);
483  }
484  }
485  else if (strncasecmp(line, "HTTP/", 5) == 0)
486  {
487  char *p = strchr(line, ' ');
488  if (p)
489  {
490  *p = 0;
491  if (outstream)
492  {
493  outstream->version = line+5;
494  outstream->status = atoi(p+1);
495  }
496  }
497  }
498  }
499  else
500  {
501  // blank line is the beginning of data section
502  curl = urls.first();
503  in_chunk_trailer = false;
504  log(WvLog::Debug4,
505  "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
506 
507  if (encoding == Unknown)
508  encoding = Infinity; // go until connection closes itself
509 
510  if (curl->method == "HEAD")
511  {
512  log("Got all headers.\n");
513  if (!enable_pipelining)
514  doneurl();
515 
516  if (encoding == Infinity)
517  encoding = PostHeadInfinity;
518  else if (encoding == Chunked)
519  encoding = PostHeadChunked;
520  else
521  encoding = PostHeadStream;
522  }
523  }
524  }
525  }
526  else if (encoding == PostHeadInfinity
527  || encoding == PostHeadChunked
528  || encoding == PostHeadStream)
529  {
530  WvDynBuf chkbuf;
531  len = read(chkbuf, 5);
532 
533  // If there is more data available right away, and it isn't an
534  // HTTP header from another request, then it's a stupid web
535  // server that likes to send bodies with HEAD requests.
536  if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)),
537  "HTTP/", 5))
538  {
539  if (encoding == PostHeadInfinity)
540  encoding = ChuckInfinity;
541  else if (encoding == PostHeadChunked)
542  encoding = ChuckChunked;
543  else if (encoding == PostHeadStream)
544  encoding = ChuckStream;
545  else
546  log(WvLog::Warning, "WvHttpStream: inconsistent state.\n");
547  }
548  else
549  doneurl();
550 
551  unread(chkbuf, len);
552  }
553  else if (encoding == ChuckInfinity)
554  {
555  len = read(buf, sizeof(buf));
556  if (len)
557  log(WvLog::Debug5, "Chucking %s bytes.\n", len);
558  if (!isok())
559  doneurl();
560  }
561  else if (encoding == ChuckChunked && !bytes_remaining)
562  {
563  encoding = Chunked;
564  }
565  else if (encoding == ChuckChunked || encoding == ChuckStream)
566  {
567  if (bytes_remaining > sizeof(buf))
568  len = read(buf, sizeof(buf));
569  else
570  len = read(buf, bytes_remaining);
571  bytes_remaining -= len;
572  if (len)
573  log(WvLog::Debug5,
574  "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining);
575  if (!bytes_remaining && encoding == ContentLength)
576  doneurl();
577  if (bytes_remaining && !isok())
578  seterr("connection interrupted");
579  }
580  else if (encoding == Chunked && !bytes_remaining)
581  {
582  line = getline();
583  if (line)
584  {
585  line = trim_string(line);
586 
587  if (in_chunk_trailer)
588  {
589  // in the trailer section of a chunked encoding
590  log(WvLog::Debug4, "Trailer: '%s'\n", line);
591 
592  // a blank line means we're finally done!
593  if (!line[0])
594  doneurl();
595  }
596  else
597  {
598  // in the "length line" section of a chunked encoding
599  if (line[0])
600  {
601  bytes_remaining = (size_t)strtoul(line, NULL, 16);
602  if (!bytes_remaining)
603  in_chunk_trailer = true;
604  log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
605  bytes_remaining, line);
606  }
607  }
608  }
609  }
610  else if (encoding == Infinity)
611  {
612  // just read data until the connection closes, and assume all was
613  // well. It sucks, but there's no way to tell if all the data arrived
614  // okay... that's why Chunked or ContentLength encoding is better.
615  len = read(buf, sizeof(buf));
616  if (!isok())
617  return;
618 
619  if (len)
620  log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
621  if (curl && curl->outstream)
622  curl->outstream->write(buf, len);
623 
624  if (!isok() && curl)
625  doneurl();
626  }
627  else // not chunked or currently in a chunk - read 'bytes_remaining' bytes.
628  {
629  // in the data section of a chunked or content-length encoding,
630  // with 'bytes_remaining' bytes of data left.
631 
632  if (bytes_remaining > sizeof(buf))
633  len = read(buf, sizeof(buf));
634  else
635  len = read(buf, bytes_remaining);
636  if (!isok())
637  return;
638 
639  bytes_remaining -= len;
640  if (len)
641  log(WvLog::Debug5,
642  "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
643  if (curl && curl->outstream)
644  curl->outstream->write(buf, len);
645 
646  if (!bytes_remaining && encoding == ContentLength && curl)
647  doneurl();
648 
649  if (bytes_remaining && !isok())
650  seterr("connection interrupted");
651 
652  if (!isok())
653  doneurl();
654  }
655 
656  if (urls.isempty())
657  alarm(5000); // just wait a few seconds before closing connection
658  else
659  alarm(60000); // give the server a minute to respond, if we're waiting
660 }
A base 64 encoder.
Definition: wvbase64.h:21
const T * peek(int offset, size_t count)
Returns a const pointer into the buffer at the specified offset to the specified number of elements w...
Definition: wvbufbase.h:225
void zap()
Clears the buffer.
Definition: wvbufbase.h:257
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition: wvbufbase.h:92
WvString getstr()
Returns the entire buffer as a null-terminated WvString.
Definition: wvbuffer.cc:17
bool flushstrstr(WvStringParm instr, WvString &outstr, bool finish=false)
Flushes data through the encoder from a string to a string.
Definition: wvencoder.cc:86
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Definition: wvstring.h:94
Base class for streams built on Unix file descriptors.
Definition: wvfdstream.h:21
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
virtual void close()
Close this stream.
Definition: wvhttpstream.cc:73
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
An IP+Port address also includes a port number, with the resulting form www.xxx.yyy....
Definition: wvaddr.h:394
SSL Stream, handles SSLv2, SSLv3, and TLS Methods - If you want it to be a server,...
Definition: wvsslstream.h:36
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
virtual void close()
Close this stream.
virtual bool isok() const
return true if the stream is actually usable right now
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvstream.cc:875
virtual bool isok() const
return true if the stream is actually usable right now
Definition: wvstream.cc:445
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition: wvstream.cc:1049
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition: wvstream.cc:532
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvstream.cc:844
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition: wvstream.cc:490
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
Definition: wvstream.h:175
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition: wvstream.cc:451
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
Definition: wvstream.cc:1190
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
Definition: wvstream.h:54
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:330
char * edit()
make the string editable, and return a non-const (char*)
Definition: wvstring.h:397
Definition: wvurl.h:17
the data structure used by pre_select()/post_select() and internally by select().
Definition: iwvstream.h:50
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...
Definition: iwvstream.h:34
char * trim_string(char *string)
Trims whitespace from the beginning and end of the character string, including carriage return / line...
Definition: strutils.cc:59