/****************************************************** The Channel ******************************************************/ #include "Python.h" #ifdef STACKLESS #include "stackless_impl.h" #include "channelobject.h" #define EXPERIMENTAL_CHANNEL_SOFT_SWITCHING 0 static void channel_dealloc(PyObject *op) { PyChannelObject *c = (PyChannelObject *) op; if (c->chan_weakreflist != NULL) PyObject_ClearWeakRefs((PyObject *)c); while (c->queue != NULL) { PyTasklet_Kill( (PyTaskletObject *) c->queue); } op->ob_type->tp_free(op); } void slp_channel_insert(PyChannelObject *channel, PyTaskletObject *task, int dir) { PyTaskletObject **chain = &channel->queue; SLP_CHAIN_INSERT(PyTaskletObject, chain, task, next, prev); channel->balance += dir; } PyTaskletObject * slp_channel_remove(PyChannelObject *channel, int dir) { PyTaskletObject **chain = &channel->queue; PyTaskletObject *ret; channel->balance -= dir; SLP_CHAIN_REMOVE(PyTaskletObject, chain, ret, next, prev) return ret; }; /* the special case to remove a specific tasklet */ PyTaskletObject * slp_channel_remove_specific(PyChannelObject *channel, int dir, PyTaskletObject *task) { /* note: we assume that the task is in the channel! */ PyTaskletObject *head = channel->queue; if (head == task) return slp_channel_remove(channel, dir); channel->queue = task; slp_channel_remove(channel, dir); channel->queue = head; return task; } PyChannelObject * PyChannel_New(PyTypeObject *type) { PyChannelObject *c; if (type == NULL) type = &PyChannel_Type; if (!PyType_IsSubtype(type, &PyChannel_Type)) return TYPE_ERROR("channel_new: type must be subtype of channel", NULL); c = (PyChannelObject *) type->tp_alloc(type, 0); if (c != NULL) { c->queue = NULL; c->balance = 0; c->chan_weakreflist = NULL; } return c; } static PyObject * channel_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { static char *argnames[] = {NULL}; if (!PyArg_ParseTupleAndKeywords(args, kwds, ":channel", argnames)) return NULL; return (PyObject *)PyChannel_New(type); } static PyMemberDef channel_members[] = { {"queue", T_OBJECT, offsetof(PyChannelObject, queue), READONLY, "the queue of waiting tasklets. None if no pending send/receive."}, {"balance", T_INT, offsetof(PyChannelObject, balance), READONLY, "the number of tasklets waiting to send (>0) or receive (<0)."}, {0} }; /********************************************************** The central functions of the channel concept. A tasklet can either send or receive on a channel. A channel has a queue of waiting tasklets. They are either all waiting to send or all waiting to receive. Initially, a channel is in a neutral state. The queue is empty, there is no way to send or receive without becoming blocked. Sending 1): A tasklet wants to send and there is a queued receiving tasklet. The sender puts its data into the receiver, unblocks it, and inserts it at the top of the runnables. The receiver is scheduled. Sending 2): A tasklet wants to send and there is no queued receiving tasklet. The sender will become blocked and inserted into the queue. The next receiver will handle the rest through "Receiving 1)". Receiving 1): A tasklet wants to receive and there is a queued sending tasklet. The receiver takes its data from the sender, unblocks it, and inserts it at the end of the runnables. The receiver continues with no switch. Receiving 2): A tasklet wants to receive and there is no queued sending tasklet. The receiver will become blocked and inserted into the queue. The next sender will handle the rest through "Sending 1)". */ static char channel_send__doc__[] = "channel.send(value) -- send a value over the channel.\n\ If no other tasklet is already receiving on the channel,\n\ the sender will be blocked. Otherwise, the receiver will\n\ be activated immediately, and the sender is put at the end of\n\ the runnables list.\n\ Note that an exception instance sent will be raised at the receiver\n\ (see also channel.send_exception)"; static PyObject * channel_hook = NULL; static void channel_callback(PyChannelObject *channel, PyTaskletObject *task, int sending, int willblock) { PyObject *args, *ret; PyObject *type, *value, *traceback; args = ret = NULL; args = Py_BuildValue("(OOii)", channel, task, sending, willblock); if (args != NULL) { PyErr_Fetch(&type, &value, &traceback); ret = PyObject_Call(channel_hook, args, NULL); if (ret != NULL) { PyErr_Restore(type, value, traceback); } else { Py_XDECREF(type); Py_XDECREF(value); Py_XDECREF(traceback); } Py_XDECREF(ret); Py_DECREF(args); } } #define NOTIFY_SEND(channel, task, willblock, res) \ if(channel_hook != NULL) { \ if (ts->st.schedlock) { \ return RUNTIME_ERROR("Recursive channel call due to callbacks!", res); \ } \ ts->st.schedlock = 1; \ channel_callback(channel, task, 1, willblock); \ ts->st.schedlock = 0;\ } #define NOTIFY_RECV(channel, task, willblock, res) \ if(channel_hook != NULL) { \ if (ts->st.schedlock) { \ return RUNTIME_ERROR("Recursive channel call due to callbacks!", res); \ } \ ts->st.schedlock = 1; \ channel_callback(channel, task, 0, willblock); \ ts->st.schedlock = 0;\ } int PyStackless_SetChannelCallback(PyObject *callable) { if(callable != NULL && !PyCallable_Check(callable)) return TYPE_ERROR("channel callback must be callable", -1); Py_XDECREF(channel_hook); Py_XINCREF(callable); channel_hook = callable; return 0; } static PyObject * PyChannel_Send_M(PyChannelObject *self, PyObject *arg) { return PyStackless_CallMethod_Main((PyObject *) self, "send", "(O)", arg); } int PyChannel_Send_nr(PyChannelObject *self, PyObject *arg) { PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type; slp_try_stackless = 1; return slp_return_wrapper(t->send(self, arg)); } int PyChannel_Send(PyChannelObject *self, PyObject *arg) { PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type; return slp_return_wrapper(t->send(self, arg)); } typedef struct _channel_blocking_frame { PyBaseFrameObject bf; PyObject *channel; PyObject *tasklet; } channel_blocking_frame; #define CHANNEL_BLOCKING_FRAME_SIZE ((sizeof(channel_blocking_frame)-sizeof(PyBaseFrameObject))/sizeof(PyObject*)) channel_blocking_frame * channel_blocking_frame_new(PyFrame_ExecFunc *exec, PyObject *channel, PyObject *tasklet) { channel_blocking_frame *rval = (channel_blocking_frame *)slp_baseframe_new(exec, 1, CHANNEL_BLOCKING_FRAME_SIZE); if (rval != NULL) { Py_INCREF(tasklet); rval->tasklet = tasklet; Py_INCREF(channel); rval->channel = channel; } return rval; } static PyObject * channel_blocking_defer(PyFrame_ExecFunc *exec, PyChannelObject *self, PyTaskletObject *tasklet) { PyThreadState *ts = PyThreadState_GET(); PyFrameObject *save = ts->frame; /* create a helper frame to perform the schedule_remove after return */ ts->frame = (PyFrameObject *) channel_blocking_frame_new(exec, (PyObject*)self, (PyObject*)tasklet); if (ts->frame == NULL) { ts->frame = save; return NULL; } Py_INCREF(ts->frame); /* the retval is still in ts->st.tempval, so we're ready */ return Py_UnwindToken; } static PyObject * channel_blocking_finished_send(PyFrameObject *f) { PyThreadState *ts = PyThreadState_GET(); channel_blocking_frame *cbf = (channel_blocking_frame *) f; Py_DECREF(cbf->channel); Py_DECREF(cbf->tasklet); ts->frame = f->f_back; return Py_UnwindToken; } static PyObject * channel_blocking_finished_recv(PyFrameObject *f) { PyThreadState *ts = PyThreadState_GET(); channel_blocking_frame *cbf = (channel_blocking_frame *) f; PyObject *result = ts->st.tempval; PyObject *rval; /* support for exception instances */ if (PyObject_IsInstance(result, PyExc_Exception)) { PyObject *type = (PyObject *) ((PyInstanceObject*)result)->in_class; PyObject *args = PyObject_GetAttrString(result, "args"); Py_INCREF(type); /* special case: un-wrap a string exception */ if (type == PyExc_Exception && args != NULL && PyTuple_Check(args) && PyTuple_GET_SIZE(args) == 2 && PyString_Check(PyTuple_GET_ITEM(args, 0)) ) { PyObject *t = PyTuple_GET_ITEM(args, 0); PyObject *a = PyTuple_GET_ITEM(args, 1); Py_INCREF(t); Py_DECREF(type); type = t; Py_INCREF(a); Py_DECREF(result); result = a; } Py_XDECREF(args); PyErr_Restore(type, result, NULL); rval = NULL; } else { rval = Py_UnwindToken; } Py_DECREF(cbf->channel); Py_DECREF(cbf->tasklet); ts->frame = f->f_back; return rval; } static CHANNEL_SEND_HEAD(impl_channel_send_nr) { PyThreadState *ts = PyThreadState_GET(); PyTaskletObject *sender, *receiver; PyObject *ret; assert(PyChannel_Check(self)); if (ts->st.main == NULL) return PyChannel_Send_M(self, arg); sender = ts->st.current; assert(PyTasklet_Check(sender)); if (self->balance < 0) { /* Sending 1): there is somebody listening */ receiver = slp_channel_remove(self, -1); receiver->flags.blocked = 0; Py_INCREF(arg); receiver->tempval = arg; /* move sender at the end of runnables */ ts->st.current = (PyTaskletObject*)sender->next; /* insert the receiver before the sender */ slp_current_insert(receiver); NOTIFY_SEND(self, sender, 0, NULL); Py_INCREF(Py_None); ts->st.tempval = Py_None; switch (slp_schedule_nr_maybe(sender, receiver)) { case -1: return NULL; case 1: return Py_UnwindToken; } if (slp_schedule_task(sender, receiver)) return NULL; if (ts->st.tempval == NULL) return NULL; } else { /* Sending 2): there is nobody listening */ if (ts->st.flags.block_trap) return RUNTIME_ERROR("this tasklet does not like to be blocked.", NULL); if (ts->st.runcount == 1 && slp_revive_main()) return RUNTIME_ERROR("the last runnable tasklet cannot be blocked.", NULL); slp_current_remove(); ts->st.flags.blocked = 1; slp_channel_insert(self, sender, 1); NOTIFY_SEND(self, sender, 1, NULL); Py_INCREF(arg); ts->st.tempval = arg; receiver = ts->st.current; switch (slp_schedule_nr_maybe(sender, receiver)) { case -1: return NULL; case 1: return channel_blocking_defer(channel_blocking_finished_send, self, sender); } if (slp_schedule_task(sender, receiver)) return NULL; if (ts->st.tempval == NULL) { if (sender->flags.blocked) { /* they *might* have invented code to unblock me early :-) */ slp_channel_remove_specific(self, 1, sender); sender->flags.blocked = 0; if (ts->st.current == sender) ts->st.current = 0; /* CCP change */ slp_current_insert(sender); ts->st.current = sender; } return NULL; } } ret = ts->st.tempval; ts->st.tempval = NULL; return ret; } static CHANNEL_SEND_HEAD(impl_channel_send) { PyThreadState *ts = PyThreadState_GET(); PyTaskletObject *sender, *receiver; PyObject *ret; assert(PyChannel_Check(self)); if (ts->st.main == NULL) return PyChannel_Send_M(self, arg); sender = ts->st.current; assert(PyTasklet_Check(sender)); if (self->balance < 0) { /* Sending 1): there is somebody listening */ receiver = slp_channel_remove(self, -1); receiver->flags.blocked = 0; Py_INCREF(arg); receiver->tempval = arg; /* move sender at the end of runnables */ ts->st.current = (PyTaskletObject*)sender->next; /* insert the receiver before the sender */ slp_current_insert(receiver); /* make the argument temporarily visible via tempval */ assert(sender->tempval == NULL); sender->tempval = arg; NOTIFY_SEND(self, sender, 0, NULL); sender->tempval = NULL; Py_INCREF(Py_None); ts->st.tempval = Py_None; if (slp_schedule_task(sender, receiver)) return NULL; if (ts->st.tempval == NULL) return NULL; } else { /* Sending 2): there is nobody listening */ if (ts->st.flags.block_trap) return RUNTIME_ERROR("this tasklet does not like to be blocked.", NULL); if (ts->st.runcount == 1 && slp_revive_main()) return RUNTIME_ERROR("the last runnable tasklet cannot be blocked.", NULL); slp_current_remove(); ts->st.flags.blocked = 1; slp_channel_insert(self, sender, 1); /* make the argument temporarily visible via tempval */ assert(sender->tempval == NULL); sender->tempval = arg; NOTIFY_SEND(self, sender, 1, NULL); sender->tempval = NULL; Py_INCREF(arg); ts->st.tempval = arg; receiver = ts->st.current; if (slp_schedule_task(sender, receiver)) return NULL; if (ts->st.tempval == NULL) { if (sender->flags.blocked) { /* they *might* have invented code to unblock me early :-) */ slp_channel_remove_specific(self, 1, sender); sender->flags.blocked = 0; if (ts->st.current == sender) ts->st.current = 0; /* CCP change */ slp_current_insert(sender); ts->st.current = sender; } return NULL; } } ret = ts->st.tempval; ts->st.tempval = NULL; return ret; } static CHANNEL_SEND_HEAD(wrap_channel_send) { return PyObject_CallMethod((PyObject *) self, "send", "(O)", arg); } static PyObject * channel_send_nr(PyObject *myself, PyObject *arg) { #if EXPERIMENTAL_CHANNEL_SOFT_SWITCHING return impl_channel_send_nr((PyChannelObject*)myself, arg); #else return impl_channel_send((PyChannelObject*)myself, arg); #endif } static PyObject * channel_send(PyObject *myself, PyObject *arg) { return impl_channel_send((PyChannelObject*)myself, arg); } static char channel_send_exception__doc__[] = "channel.send_exception(exc, value) -- send an exception over the channel.\n\ exc must be a subclass of Exception.\n\ Behavior is like channel.send, but that the receiver gets an exception.\n\ channel.send(exception_instance) has the same effect."; static PyObject * PyChannel_SendException_M(PyChannelObject *self, PyObject *klass, PyObject *args) { return PyStackless_CallMethod_Main((PyObject *) self, "send_exception", "(OO)", klass, args); } int PyChannel_SendException(PyChannelObject *self, PyObject *klass, PyObject *args) { PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type; return slp_return_wrapper(t->send_exception(self, klass, args)); } static CHANNEL_SEND_EXCEPTION_HEAD(impl_channel_send_exception_nr) { STACKLESS_GETARG(); PyThreadState *ts = PyThreadState_GET(); PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type; PyObject *exc = NULL; PyObject *ret; assert(PyChannel_Check(self)); if (ts->st.main == NULL) return PyChannel_SendException_M(self, klass, args); if (! (PyObject_IsSubclass(klass, PyExc_Exception) == 1)) { if (! PyString_Check(klass) ) return TYPE_ERROR("channel.send_exception needs Exception or string subclass as first parameter", NULL); /* we got a string instead of a class. Wrap a dummy around */ args = Py_BuildValue("(OO)", klass, args); klass = PyExc_Exception; } else { if (! PyTuple_Check(args) ) { args = Py_BuildValue("(O)", args); } else { Py_INCREF(args); } } if (args != NULL) exc = PyObject_Call(klass, args, NULL); Py_XDECREF(args); if (exc == NULL) return NULL; slp_try_stackless = stackless; ret = t->send(self, exc); STACKLESS_ASSERT(); Py_DECREF(exc); return ret; } static CHANNEL_SEND_EXCEPTION_HEAD(impl_channel_send_exception) { PyThreadState *ts = PyThreadState_GET(); PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type; PyObject *exc = NULL; PyObject *ret; assert(PyChannel_Check(self)); if (ts->st.main == NULL) return PyChannel_SendException_M(self, klass, args); if (! (PyObject_IsSubclass(klass, PyExc_Exception) == 1)) { if (! PyString_Check(klass) ) return TYPE_ERROR("channel.send_exception needs Exception or string subclass as first parameter", NULL); /* we got a string instead of a class. Wrap a dummy around */ args = Py_BuildValue("(OO)", klass, args); klass = PyExc_Exception; } else { if (! PyTuple_Check(args) ) { args = Py_BuildValue("(O)", args); } else { Py_INCREF(args); } } if (args != NULL) exc = PyObject_Call(klass, args, NULL); Py_XDECREF(args); if (exc == NULL) return NULL; ret = t->send(self, exc); Py_DECREF(exc); return ret; } static CHANNEL_SEND_EXCEPTION_HEAD(wrap_channel_send_exception_nr) { return PyObject_CallMethod((PyObject *) self, "send_exception", "(OO)", klass, args); } static CHANNEL_SEND_EXCEPTION_HEAD(wrap_channel_send_exception) { return PyObject_CallMethod((PyObject *) self, "send_exception", "(OO)", klass, args); } static PyObject * channel_send_exception_nr(PyObject *myself, PyObject *args) { PyObject *result = NULL; PyObject *klass = PySequence_GetItem(args, 0); PyThreadState *ts = PyThreadState_GET(); PyTaskletObject *caller = ts->st.current; if (klass == NULL) return VALUE_ERROR("channel.send_exception(e, v...)", NULL); args = PySequence_GetSlice(args, 1, PySequence_Size(args)); if (!args) { goto err_exit; } result = impl_channel_send_exception_nr((PyChannelObject*)myself, klass, args); if (result == NULL) { goto err_exit; } else if (result == Py_UnwindToken) { Py_INCREF(Py_None); caller->tempval = Py_None; return result; } Py_INCREF(Py_None); result = Py_None; err_exit: Py_DECREF(klass); Py_XDECREF(args); return result; } static PyObject * channel_send_exception(PyObject *myself, PyObject *args) { PyObject *result = NULL; PyObject *klass = PySequence_GetItem(args, 0); PyThreadState *ts = PyThreadState_GET(); PyTaskletObject *caller = ts->st.current; if (klass == NULL) return VALUE_ERROR("channel.send_exception(e, v...)", NULL); args = PySequence_GetSlice(args, 1, PySequence_Size(args)); if (!args) { goto err_exit; } result = impl_channel_send_exception_nr((PyChannelObject*)myself, klass, args); if (result == NULL) { goto err_exit; } else if (result == Py_UnwindToken) { Py_INCREF(Py_None); caller->tempval = Py_None; return result; } Py_INCREF(Py_None); result = Py_None; err_exit: Py_DECREF(klass); Py_XDECREF(args); return result; } static char channel_receive__doc__[] = "channel.receive() -- receive a value over the channel.\n\ If no other tasklet is already sending on the channel,\n\ the receiver will be blocked. Otherwise, the receiver will\n\ continue immediately, and the sender is put at the end of\n\ the runnables list.\n\ If an exception instance is sent, it will be raised at the receiver."; static PyObject * PyChannel_Receive_M(PyChannelObject *self) { return PyStackless_CallMethod_Main((PyObject *) self, "receive", NULL); } PyObject * PyChannel_Receive_nr(PyChannelObject *self) { PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type; PyObject *ret; slp_try_stackless = 1; ret = t->receive(self); STACKLESS_ASSERT(); return ret; } PyObject * PyChannel_Receive(PyChannelObject *self) { PyChannel_HeapType *t = (PyChannel_HeapType *) self->ob_type; return t->receive(self); } static CHANNEL_RECEIVE_HEAD(impl_channel_receive_nr) { PyThreadState *ts = PyThreadState_GET(); PyTaskletObject *sender, *receiver; PyObject *result; if (ts->st.main == NULL) return PyChannel_Receive_M(self); receiver = ts->st.current; assert(PyTasklet_Check(receiver)); if (self->balance > 0) { /* Receiving 1): there is somebody talking */ sender = slp_channel_remove(self, 1); sender->flags.blocked = 0; result = sender->tempval; sender->tempval = Py_None; Py_INCREF(Py_None); /* move sender to the end of runnables */ slp_current_insert(sender); NOTIFY_RECV(self, receiver, 0, NULL); } else { /* Receiving 2): there is nobody talking */ if (ts->st.flags.block_trap) return RUNTIME_ERROR("this tasklet does not like to be blocked.", NULL); if (ts->st.runcount == 1 && slp_revive_main()) return RUNTIME_ERROR("the last runnable tasklet cannot be blocked.", NULL); slp_current_remove(); ts->st.flags.blocked = -1; slp_channel_insert(self, receiver, -1); NOTIFY_RECV(self, receiver, 1, NULL); /* note that we don't set tempval since the sender does it */ switch (slp_schedule_nr_maybe(receiver, ts->st.current)) { case -1: return NULL; case 1: return channel_blocking_defer(channel_blocking_finished_recv, self, receiver); } if (slp_schedule_task(receiver, ts->st.current)) return NULL; if (ts->st.tempval == NULL) { /* reactivate on exception */ if (receiver->flags.blocked) { /* they *might* have invented code to unblock me early :-) */ slp_channel_remove_specific(self, -1, receiver); receiver->flags.blocked = 0; if (ts->st.current == receiver) ts->st.current = 0; /* CCP change */ slp_current_insert(receiver); ts->st.current = receiver; } return NULL; } result = ts->st.tempval; ts->st.tempval = NULL; } /* support for exception instances */ if (PyObject_IsInstance(result, PyExc_Exception)) { PyObject *type = (PyObject *) ((PyInstanceObject*)result)->in_class; PyObject *args = PyObject_GetAttrString(result, "args"); Py_INCREF(type); /* special case: un-wrap a string exception */ if (type == PyExc_Exception && args != NULL && PyTuple_Check(args) && PyTuple_GET_SIZE(args) == 2 && PyString_Check(PyTuple_GET_ITEM(args, 0)) ) { PyObject *t = PyTuple_GET_ITEM(args, 0); PyObject *a = PyTuple_GET_ITEM(args, 1); Py_INCREF(t); Py_DECREF(type); type = t; Py_INCREF(a); Py_DECREF(result); result = a; } Py_XDECREF(args); PyErr_Restore(type, result, NULL); return NULL; } return result; } static CHANNEL_RECEIVE_HEAD(impl_channel_receive) { PyThreadState *ts = PyThreadState_GET(); PyTaskletObject *sender, *receiver; PyObject *result; if (ts->st.main == NULL) return PyChannel_Receive_M(self); receiver = ts->st.current; assert(PyTasklet_Check(receiver)); if (self->balance > 0) { /* Receiving 1): there is somebody talking */ sender = slp_channel_remove(self, 1); sender->flags.blocked = 0; NOTIFY_RECV(self, receiver, 0, NULL); result = sender->tempval; sender->tempval = Py_None; Py_INCREF(Py_None); /* move sender to the end of runnables */ slp_current_insert(sender); } else { /* Receiving 2): there is nobody talking */ if (ts->st.flags.block_trap) return RUNTIME_ERROR("this tasklet does not like to be blocked.", NULL); if (ts->st.runcount == 1 && slp_revive_main()) return RUNTIME_ERROR("the last runnable tasklet cannot be blocked.", NULL); slp_current_remove(); ts->st.flags.blocked = -1; slp_channel_insert(self, receiver, -1); NOTIFY_RECV(self, receiver, 1, NULL); /* note that we don't set tempval since the sender does it */ if (slp_schedule_task(receiver, ts->st.current)) return NULL; if (ts->st.tempval == NULL) { /* reactivate on exception */ if (receiver->flags.blocked) { /* they *might* have invented code to unblock me early :-) */ slp_channel_remove_specific(self, -1, receiver); receiver->flags.blocked = 0; if (ts->st.current == receiver) ts->st.current = 0; /* we know that receiver was linked into channel, not current */ slp_current_insert(receiver); ts->st.current = receiver; } return NULL; } result = ts->st.tempval; ts->st.tempval = NULL; } /* support for exception instances */ if (PyObject_IsInstance(result, PyExc_Exception)) { PyObject *type = (PyObject *) ((PyInstanceObject*)result)->in_class; PyObject *args = PyObject_GetAttrString(result, "args"); Py_INCREF(type); /* special case: un-wrap a string exception */ if (type == PyExc_Exception && args != NULL && PyTuple_Check(args) && PyTuple_GET_SIZE(args) == 2 && PyString_Check(PyTuple_GET_ITEM(args, 0)) ) { PyObject *t = PyTuple_GET_ITEM(args, 0); PyObject *a = PyTuple_GET_ITEM(args, 1); Py_INCREF(t); Py_DECREF(type); type = t; Py_INCREF(a); Py_DECREF(result); result = a; } Py_XDECREF(args); PyErr_Restore(type, result, NULL); return NULL; } return result; } static CHANNEL_RECEIVE_HEAD(wrap_channel_receive_nr) { return PyObject_CallMethod((PyObject *) self, "receive", NULL); } static CHANNEL_RECEIVE_HEAD(wrap_channel_receive) { return PyObject_CallMethod((PyObject *) self, "receive", NULL); } static PyObject * channel_receive_nr(PyObject *myself) { #if EXPERIMENTAL_CHANNEL_SOFT_SWITCHING return impl_channel_receive_nr((PyChannelObject*)myself); #else return impl_channel_receive((PyChannelObject*)myself); #endif } static PyObject * channel_receive(PyObject *myself) { return impl_channel_receive((PyChannelObject*)myself); } static PyCMethodDef channel_cmethods[] = { CMETHOD_PUBLIC_ENTRY(PyChannel_HeapType, channel, send), CMETHOD_PUBLIC_ENTRY(PyChannel_HeapType, channel, send_exception), CMETHOD_PUBLIC_ENTRY(PyChannel_HeapType, channel, receive), {NULL} /* sentinel */ }; static PyMethodDef channel_methods[] = { {"send", (PyCFunction)channel_send, METH_O, channel_send__doc__}, {"send_exception", (PyCFunction)channel_send_exception, METH_VARARGS, channel_send_exception__doc__}, {"receive", (PyCFunction)channel_receive, METH_NOARGS, channel_receive__doc__}, {NULL, NULL} /* sentinel */ }; static char channel__doc__[] = "A channel object is used for communication between tasklets.\n\ By sending on a channel, a tasklet that is waiting to receive\n\ is resumed. If there is no waiting receiver, the sender is suspended.\n\ By receiving from a channel, a tasklet that is waiting to send\n\ is resumed. If there is no waiting sender, the receiver is suspended.\ "; PyTypeObject _PyChannel_Type = { PyObject_HEAD_INIT(&PyType_Type) 0, "channel", sizeof(PyChannelObject), 0, (destructor)channel_dealloc, /* tp_dealloc */ 0, /* tp_print */ 0, /* tp_getattr */ 0, /* tp_setattr */ 0, /* tp_compare */ 0, /* tp_repr */ 0, /* tp_as_number */ 0, /* tp_as_sequence */ 0, /* tp_as_mapping */ 0, /* tp_hash */ 0, /* tp_call */ 0, /* tp_str */ PyObject_GenericGetAttr, /* tp_getattro */ PyObject_GenericSetAttr, /* tp_setattro */ 0, /* tp_as_buffer */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */ channel__doc__, /* tp_doc */ 0, /* tp_traverse */ 0, /* tp_clear */ 0, /* tp_richcompare */ offsetof(PyChannelObject, chan_weakreflist), /* tp_weaklistoffset */ 0, /* tp_iter */ 0, /* tp_iternext */ channel_methods, /* tp_methods */ channel_members, /* tp_members */ 0, /* tp_getset */ 0, /* tp_base */ 0, /* tp_dict */ 0, /* tp_descr_get */ 0, /* tp_descr_set */ 0, /* tp_dictoffset */ 0, /* tp_init */ 0, /* tp_alloc */ channel_new, /* tp_new */ _PyObject_Del, /* tp_free */ }; PyTypeObject *PyChannel_TypePtr = NULL; /****************************************************** source module initialization ******************************************************/ int init_channeltype(void) { PyTypeObject *t = &_PyChannel_Type; if ( (t = PyFlexType_Build("stackless", "channel", t->tp_doc, "", t, sizeof(PyChannel_HeapType), channel_cmethods) ) == NULL) return -1; PyChannel_TypePtr = t; return 0; } #endif