00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "wvstream.h"
00012 #include "wvtask.h"
00013 #include <time.h>
00014 #include <sys/types.h>
00015 #include <errno.h>
00016 #include <assert.h>
00017
00018
00019
00020 #if 0
00021 # define TRACE(x, y...) fprintf(stderr, x, ## y)
00022 #else
00023 # define TRACE(x, y...)
00024 #endif
00025
00026 WvTaskMan *WvStream::taskman;
00027
00028 static void normalize(struct timeval &tv)
00029 {
00030 tv.tv_sec += tv.tv_usec / 1000000;
00031 tv.tv_usec %= 1000000;
00032 }
00033
00034
00035 WvStream::WvStream(int _fd) : callfunc(NULL)
00036 {
00037 init();
00038 rwfd = _fd;
00039 }
00040
00041
00042 void WvStream::init()
00043 {
00044 wvstream_execute_called = false;
00045 userdata = NULL;
00046 errnum = 0;
00047 max_outbuf_size = 0;
00048 outbuf_delayed_flush = alarm_was_ticking = false;
00049 force.readable = true;
00050 force.writable = force.isexception = false;
00051 read_requires_writable = write_requires_readable = NULL;
00052 running_callback = false;
00053 queue_min = 0;
00054 autoclose_time = 0;
00055 alarm_time.tv_sec = alarm_time.tv_usec = 0;
00056
00057
00058 uses_continue_select = false;
00059 personal_stack_size = 65536;
00060 task = NULL;
00061 }
00062
00063
00064 WvStream::~WvStream()
00065 {
00066 TRACE("destroying %p\n", this);
00067 if (running_callback)
00068 {
00069
00070 TRACE("eek! destroying while running_callback!\n");
00071 assert(!running_callback);
00072 }
00073 close();
00074
00075 if (task)
00076 {
00077 while (task->isrunning())
00078 taskman->run(*task);
00079 task->recycle();
00080 task = NULL;
00081 }
00082 TRACE("done destroying %p\n", this);
00083 }
00084
00085
00086 void WvStream::close()
00087 {
00088 int rfd = getrfd(), wfd = getwfd();
00089
00090 flush(2000);
00091 if (rfd >= 0)
00092 ::close(getrfd());
00093 if (wfd >= 0 && wfd != rfd)
00094 ::close(getwfd());
00095 rwfd = -1;
00096 }
00097
00098
00099 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00100 {
00101 WvStream &s2 = *(WvStream *)userdata;
00102 char buf[1024];
00103 size_t len;
00104
00105 len = s.read(buf, sizeof(buf));
00106 s2.write(buf, len);
00107 }
00108
00109
00110
00111
00112
00113 void WvStream::_callback(void *stream)
00114 {
00115 WvStream *s = (WvStream *)stream;
00116
00117 s->running_callback = true;
00118
00119 s->wvstream_execute_called = false;
00120 s->execute();
00121 if (s->callfunc)
00122 s->callfunc(*s, s->userdata);
00123
00124
00125
00126
00127
00128
00129 assert(s->wvstream_execute_called);
00130
00131 s->running_callback = false;
00132 }
00133
00134
00135 void WvStream::callback()
00136 {
00137 TRACE("(?)");
00138
00139
00140
00141 if (running_callback)
00142 return;
00143
00144
00145 if (alarm_remaining() == 0)
00146 {
00147 alarm_time.tv_sec = alarm_time.tv_usec = 0;
00148 alarm_was_ticking = true;
00149 }
00150 else
00151 alarm_was_ticking = false;
00152
00153 assert(!uses_continue_select || personal_stack_size >= 1024);
00154
00155
00156 if (uses_continue_select && personal_stack_size >= 1024)
00157 {
00158 if (!taskman)
00159 taskman = new WvTaskMan;
00160
00161 if (!task)
00162 {
00163 TRACE("(!)");
00164 task = taskman->start("streamexec", _callback, this,
00165 personal_stack_size);
00166 }
00167 else if (!task->isrunning())
00168 {
00169 TRACE("(.)");
00170 fflush(stderr);
00171 task->start("streamexec2", _callback, this);
00172 }
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192 do
00193 {
00194 taskman->run(*task);
00195 } while (task && task->isrunning() && running_callback);
00196 }
00197 else
00198 _callback(this);
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214 }
00215
00216
00217 void WvStream::execute()
00218 {
00219
00220 wvstream_execute_called = true;
00221 }
00222
00223
00224
00225 int WvStream::getrfd() const
00226 {
00227 return rwfd;
00228 }
00229
00230
00231
00232 int WvStream::getwfd() const
00233 {
00234 return rwfd;
00235 }
00236
00237
00238 int WvStream::getfd() const
00239 {
00240 int rfd = getrfd(), wfd = getwfd();
00241 assert(rfd == wfd);
00242 return rfd;
00243 }
00244
00245
00246 bool WvStream::isok() const
00247 {
00248 return (getrfd() != -1) && (getwfd() != -1);
00249 }
00250
00251
00252 void WvStream::seterr(int _errnum)
00253 {
00254 if (!errnum)
00255 errnum = _errnum;
00256 close();
00257 }
00258
00259
00260 void WvStream::seterr(const WvString &specialerr)
00261 {
00262 if (!errnum)
00263 {
00264 errstring = specialerr;
00265 errnum = -1;
00266 }
00267 close();
00268 }
00269
00270
00271 int WvStream::geterr() const
00272 {
00273 return errnum;
00274 }
00275
00276
00277 const char *WvStream::errstr() const
00278 {
00279 if (errnum == -1)
00280 {
00281 assert(errstring);
00282 return errstring;
00283 }
00284 else
00285 return strerror(errnum);
00286 }
00287
00288
00289 size_t WvStream::read(void *buf, size_t count)
00290 {
00291 size_t bufu = inbuf.used(), i;
00292 unsigned char *newbuf;
00293
00294 bufu = inbuf.used();
00295 if (bufu < queue_min)
00296 {
00297 newbuf = inbuf.alloc(queue_min - bufu);
00298 i = uread(newbuf, queue_min - bufu);
00299 inbuf.unalloc(queue_min - bufu - i);
00300
00301 bufu = inbuf.used();
00302 }
00303
00304 if (bufu < queue_min)
00305 return 0;
00306
00307
00308 if (!bufu)
00309 bufu = uread(buf, count);
00310 else
00311 {
00312
00313 if (bufu > count)
00314 bufu = count;
00315
00316 memcpy(buf, inbuf.get(bufu), bufu);
00317 }
00318
00319 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00320 return bufu;
00321 }
00322
00323
00324 size_t WvStream::uread(void *buf, size_t count)
00325 {
00326 if (!isok() || !buf || !count) return 0;
00327
00328 int in = ::read(getrfd(), buf, count);
00329
00330 if (in < 0 && (errno==EINTR || errno==EAGAIN || errno==ENOBUFS))
00331 return 0;
00332
00333 if (in < 0 || (count && in==0))
00334 {
00335 seterr(in < 0 ? errno : 0);
00336 return 0;
00337 }
00338
00339 return in;
00340 }
00341
00342
00343 size_t WvStream::write(const void *buf, size_t count)
00344 {
00345 if (!isok() || !buf || !count) return 0;
00346
00347 size_t wrote = 0;
00348
00349 if (!outbuf_delayed_flush && outbuf.used())
00350 flush(0);
00351
00352 if (!outbuf_delayed_flush && !outbuf.used())
00353 wrote = uwrite(buf, count);
00354
00355 if (max_outbuf_size && (outbuf.used() > max_outbuf_size))
00356 return wrote;
00357
00358 outbuf.put((unsigned char *)buf + wrote, count - wrote);
00359
00360
00361
00362 return count;
00363 }
00364
00365
00366 size_t WvStream::uwrite(const void *buf, size_t count)
00367 {
00368 if (!isok() || !buf || !count) return 0;
00369
00370 int out = ::write(getwfd(), buf, count);
00371
00372 if (out < 0 && (errno == ENOBUFS || errno==EAGAIN))
00373 return 0;
00374
00375 if (out < 0 || (count && out==0))
00376 {
00377 seterr(out < 0 ? errno : 0);
00378 return 0;
00379 }
00380
00381
00382 return out;
00383 }
00384
00385
00386
00387
00388 char *WvStream::getline(time_t wait_msec, char separator)
00389 {
00390 size_t i;
00391 unsigned char *buf;
00392
00393
00394
00395 while (isok())
00396 {
00397 queuemin(0);
00398
00399
00400 if ((i = inbuf.strchr(separator)) > 0)
00401 {
00402 buf = inbuf.get(i);
00403 buf[i-1] = 0;
00404 return (char *)buf;
00405 }
00406 else if (!isok())
00407 {
00408 if (inbuf.used())
00409 {
00410
00411 inbuf.alloc(1)[0] = 0;
00412 return (char *)inbuf.get(inbuf.used());
00413 }
00414 else
00415 return NULL;
00416 }
00417
00418
00419 if (inbuf.used())
00420 queuemin(inbuf.used() + 1);
00421
00422
00423
00424 if (uses_continue_select)
00425 {
00426 if (!continue_select(wait_msec) && isok() && wait_msec >= 0)
00427 return NULL;
00428 }
00429 else
00430 {
00431 if (!select(wait_msec) && isok() && wait_msec >= 0)
00432 return NULL;
00433 }
00434
00435 if (!isok())
00436 return NULL;
00437
00438
00439 buf = inbuf.alloc(1024);
00440 i = uread(buf, 1024);
00441 inbuf.unalloc(1024 - i);
00442 }
00443
00444
00445 return NULL;
00446 }
00447
00448
00449 void WvStream::drain()
00450 {
00451 char buf[1024];
00452 while (select(0))
00453 read(buf, sizeof(buf));
00454 }
00455
00456
00457 void WvStream::flush(time_t msec_timeout)
00458 {
00459 size_t attempt, real;
00460
00461
00462
00463 if (!isok()) return;
00464
00465 while (isok() && outbuf.used())
00466 {
00467 attempt = outbuf.used();
00468 if (attempt > 1400)
00469 attempt = 1400;
00470 real = uwrite(outbuf.get(attempt), attempt);
00471 if (real < attempt)
00472 outbuf.unget(attempt - real);
00473
00474
00475
00476 if (!msec_timeout || !select(msec_timeout, false, true))
00477 {
00478 if (msec_timeout >= 0)
00479 break;
00480 }
00481 }
00482
00483 if (autoclose_time)
00484 {
00485 time_t now = time(NULL);
00486 TRACE("Autoclose enabled for 0x%08X - now-time=%ld, buf %d bytes\n",
00487 (unsigned int)this, now - autoclose_time, outbuf.used());
00488 if (!outbuf.used() || now > autoclose_time)
00489 {
00490 autoclose_time = 0;
00491 close();
00492 }
00493 }
00494 }
00495
00496
00497 void WvStream::flush_then_close(int msec_timeout)
00498 {
00499 time_t now = time(NULL);
00500 autoclose_time = now + (msec_timeout + 999) / 1000;
00501
00502 TRACE("Autoclose SETUP for 0x%08X - buf %d bytes, timeout %ld sec\n",
00503 (unsigned int)this, outbuf.used(), autoclose_time - now);
00504
00505
00506
00507
00508
00509
00510 flush(0);
00511 }
00512
00513
00514 bool WvStream::pre_select(SelectInfo &si)
00515 {
00516 int rfd, wfd;
00517
00518 time_t alarmleft = alarm_remaining();
00519
00520 if (alarmleft == 0)
00521 return true;
00522
00523
00524 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00525 return true;
00526
00527 rfd = getrfd();
00528 wfd = getwfd();
00529
00530 if (si.wants.readable && (rfd >= 0))
00531 FD_SET(rfd, &si.read);
00532 if ((si.wants.writable || outbuf.used() || autoclose_time) && (wfd >= 0))
00533 FD_SET(wfd, &si.write);
00534 if (si.wants.isexception)
00535 {
00536 if (rfd >= 0) FD_SET(rfd, &si.except);
00537 if (wfd >= 0) FD_SET(wfd, &si.except);
00538 }
00539
00540 if (si.max_fd < rfd)
00541 si.max_fd = rfd;
00542 if (si.max_fd < wfd)
00543 si.max_fd = wfd;
00544
00545 if (alarmleft >= 0
00546 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00547 si.msec_timeout = alarmleft;
00548
00549 return false;
00550 }
00551
00552
00553 bool WvStream::post_select(SelectInfo &si)
00554 {
00555 size_t outbuf_used = outbuf.used();
00556 int rfd = getrfd(), wfd = getwfd();
00557 bool val;
00558
00559
00560 if (wfd >= 0
00561 && (outbuf_used || autoclose_time)
00562 && FD_ISSET(wfd, &si.write))
00563 {
00564 flush(0);
00565
00566
00567 if (!isok()) return false;
00568 }
00569
00570 val = ((rfd >= 0 && FD_ISSET(rfd, &si.read)) ||
00571 (wfd >= 0 && FD_ISSET(wfd, &si.write)) ||
00572 (rfd >= 0 && FD_ISSET(rfd, &si.except)) ||
00573 (wfd >= 0 && FD_ISSET(wfd, &si.except)));
00574
00575 if (val && si.wants.readable && read_requires_writable
00576 && !read_requires_writable->select(0, false, true))
00577 return false;
00578 if (val && si.wants.writable && write_requires_readable
00579 && !write_requires_readable->select(0, true, false))
00580 return false;
00581
00582 return val;
00583 }
00584
00585
00586 bool WvStream::_select(time_t msec_timeout,
00587 bool readable, bool writable, bool isexcept,
00588 bool forceable)
00589 {
00590 bool sure;
00591 int sel;
00592 timeval tv;
00593 SelectInfo si;
00594
00595 if (!isok()) return false;
00596
00597 FD_ZERO(&si.read);
00598 FD_ZERO(&si.write);
00599 FD_ZERO(&si.except);
00600
00601 if (forceable)
00602 si.wants = force;
00603 else
00604 {
00605 si.wants.readable = readable;
00606 si.wants.writable = writable;
00607 si.wants.isexception = isexcept;
00608 }
00609
00610 si.max_fd = -1;
00611 si.msec_timeout = msec_timeout;
00612 si.inherit_request = !forceable;
00613
00614 sure = pre_select(si);
00615
00616 if (sure)
00617 {
00618 si.msec_timeout = 0;
00619 tv.tv_sec = tv.tv_usec = 0;
00620 }
00621 else
00622 {
00623 tv.tv_sec = si.msec_timeout / 1000;
00624 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00625 }
00626
00627 sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00628 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00629
00630 if (sel < 0)
00631 {
00632 if (errno!=EAGAIN && errno!=EINTR && errno!=ENOBUFS)
00633 seterr(errno);
00634 return sure;
00635 }
00636
00637 if (!sel)
00638 return sure;
00639
00640 return isok() && post_select(si);
00641 }
00642
00643
00644 void WvStream::force_select(bool readable, bool writable, bool isexception)
00645 {
00646 force.readable |= readable;
00647 force.writable |= writable;
00648 force.isexception |= isexception;
00649 }
00650
00651
00652 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00653 {
00654 force.readable &= !readable;
00655 force.writable &= !writable;
00656 force.isexception &= !isexception;
00657 }
00658
00659
00660 void WvStream::alarm(time_t msec_timeout)
00661 {
00662 struct timezone tz;
00663
00664 if (msec_timeout >= 0)
00665 {
00666 gettimeofday(&alarm_time, &tz);
00667 alarm_time.tv_sec += msec_timeout / 1000;
00668 alarm_time.tv_usec += (msec_timeout % 1000) * 1000;
00669 normalize(alarm_time);
00670 }
00671 else
00672 {
00673
00674 alarm_time.tv_sec = alarm_time.tv_usec = 0;
00675 }
00676 }
00677
00678
00679 time_t WvStream::alarm_remaining()
00680 {
00681 struct timeval &a = alarm_time;
00682
00683 if (a.tv_sec)
00684 {
00685 struct timeval tv;
00686 struct timezone tz;
00687
00688 gettimeofday(&tv, &tz);
00689 normalize(tv);
00690
00691 if (a.tv_sec < tv.tv_sec
00692 || ( a.tv_sec == tv.tv_sec
00693 && a.tv_usec <= tv.tv_usec))
00694 {
00695 return 0;
00696 }
00697 else if (a.tv_sec > tv.tv_sec)
00698 {
00699 return ((a.tv_sec - tv.tv_sec) * 1000
00700 + (a.tv_usec - tv.tv_usec) / 1000);
00701 }
00702 else
00703 {
00704 return (a.tv_usec - tv.tv_usec) / 1000;
00705 }
00706 }
00707
00708 return -1;
00709 }
00710
00711
00712 bool WvStream::continue_select(time_t msec_timeout)
00713 {
00714 assert(uses_continue_select);
00715 assert(task);
00716 assert(taskman);
00717 assert(taskman->whoami() == task);
00718
00719 if (msec_timeout >= 0)
00720 alarm(msec_timeout);
00721
00722 running_callback = false;
00723 taskman->yield();
00724 alarm(-1);
00725
00726
00727
00728
00729
00730
00731
00732 return !alarm_was_ticking || select(0);
00733 }
00734
00735
00736 void WvStream::terminate_continue_select()
00737 {
00738 close();
00739 if (task)
00740 {
00741 while (task->isrunning())
00742 taskman->run(*task);
00743 task->recycle();
00744 task = NULL;
00745 }
00746 }
00747
00748
00749 const WvAddr *WvStream::src() const
00750 {
00751 return NULL;
00752 }
00753