aboutsummaryrefslogtreecommitdiff
path: root/src/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/thread.c')
-rw-r--r--src/thread.c342
1 files changed, 214 insertions, 128 deletions
diff --git a/src/thread.c b/src/thread.c
index 5aed694..f166c3f 100644
--- a/src/thread.c
+++ b/src/thread.c
@@ -3,6 +3,8 @@
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
+#include <signal.h>
+#include <sys/eventfd.h>
#include "types/str.h"
#include "util.h"
@@ -11,80 +13,16 @@
#include "table.h"
struct thread_info {
- str* function;
- lua_State* L;
- int return_count, done;
- pthread_t tid;
- pthread_mutex_t* lock;
+ str* function;
+ lua_State* L;
+ int return_count, done, request_close;
+ pthread_t tid;
+ pthread_mutex_t* lock, *ready_lock, *close_lock;
+ pthread_cond_t* cond;
};
#include "io.h"
-//give the current thread priority to locking thread_lock_lock (fixes race conds)
-pthread_mutex_t thread_priority_lock = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
-pthread_mutex_t thread_lock_lock = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
-larray_t* thread_locks = NULL;
-
-void lib_thread_clean(){
- if(thread_locks == NULL) return;
-
- for(int i = 0; i != thread_locks->size; i++){
- if(thread_locks->arr[i].used){
- //pthread_mutex_destroy(thread_locks->arr[i].value);
- free(thread_locks->arr[i].value);
- }
- }
-
- larray_clear(thread_locks);
-}
-int l_tlock(lua_State* L){
- int idx = luaL_checkinteger(L, 1);
-
- pthread_mutex_lock(&thread_lock_lock);
- //pthread_mutex_lock(&thread_priority_lock);
- //pthread_mutex_unlock(&thread_priority_lock);
- pthread_mutex_t mutex;
- if(thread_locks == NULL) thread_locks = larray_init();
- int i = 0;
- if((i = larray_geti(thread_locks, idx)) == -1){
- pthread_mutex_init(&mutex, NULL);
- pthread_mutex_lock(&mutex);
- pthread_mutex_t* mp = malloc(sizeof * mp);
- *mp = mutex;
- larray_set(&thread_locks, idx, (void*)mp);
- } else {
- pthread_mutex_t *m = (pthread_mutex_t*)thread_locks->arr[i].value;
- pthread_mutex_lock(&thread_priority_lock);
-
- pthread_mutex_unlock(&thread_lock_lock);
- pthread_mutex_lock(m);
- pthread_mutex_lock(&thread_lock_lock);
-
- pthread_mutex_unlock(&thread_priority_lock);
- thread_locks->arr[i].value = (void*)m;
-
- }
-
- pthread_mutex_unlock(&thread_lock_lock);
- return 0;
-}
-
-int l_tunlock(lua_State* L){
- int idx = luaL_checkinteger(L, 1);
-
- pthread_mutex_lock(&thread_lock_lock);
- int i = 0;
- if(thread_locks != NULL && (i = larray_geti(thread_locks, idx)) != -1){
- pthread_mutex_t *m = (pthread_mutex_t*)thread_locks->arr[i].value;
-
- pthread_mutex_unlock(m);
- thread_locks->arr[i].value = (void*)m;
- }
-
- pthread_mutex_unlock(&thread_lock_lock);
- return 0;
-}
-
int _mutex_lock(lua_State* L){
lua_pushstring(L, "_");
lua_gettable(L, 1);
@@ -145,37 +83,85 @@ int l_mutex(lua_State* L){
}
int l_res(lua_State* L){
- int return_count = lua_gettop(L) - 1;
- lua_pushstring(L, "_");
- lua_gettable(L, 1);
- struct thread_info* info = lua_touserdata(L, -1);
- info->return_count = return_count;
+ int return_count = lua_gettop(L) - 1;
+ lua_pushstring(L, "_");
+ lua_gettable(L, 1);
+ struct thread_info* info = lua_touserdata(L, -1);
+ info->return_count = return_count;
+
+ lua_newtable(L);
+ int idx = lua_gettop(L);
- for(int i = info->return_count - 1; i != -1; i--){
- int ot = lua_gettop(L);
+ for(int i = info->return_count - 1; i != -1; i--){
+ lua_pushinteger(L, lua_rawlen(L, idx) + 1);
+ lua_pushvalue(L, i + 2);
+ lua_settable(L, idx);
+ }
- lua_pushvalue(L, 2 + i);
- luaI_deepcopy(L, info->L, 0);
+ env_table(L, 0);
+ lua_setglobal(L, "_res_locals");
+
+ lua_pushvalue(L, idx);
+ lua_setglobal(L, "_return_table");
+
+ lua_pushstring(L, "res():exit");
+ lua_error(L);
+
+ return 1;
+}
+
+int _res_testclose(lua_State* L){
+ lua_pushstring(L, "_");
+ lua_gettable(L, 1);
+ struct thread_info* info = lua_touserdata(L, -1);
+
+ pthread_mutex_lock(&*info->close_lock);
+ pthread_cond_signal(&*info->cond);
+
+ if(info->request_close){
+ info->done = 1;
- lua_settop(L, ot);
- }
-
pthread_mutex_unlock(&*info->lock);
+ pthread_mutex_unlock(&*info->close_lock);
pthread_exit(NULL);
- p_error("thread did not exit");
+ }
- return 1;
+ pthread_mutex_unlock(&*info->close_lock);
+
+ return 0;
+}
+
+void _res_testclose_debug(lua_State* L, lua_Debug* d){
+ _res_testclose(L);
+}
+
+int _res_autoclose(lua_State* L){
+ lua_sethook(L, _res_testclose_debug, LUA_HOOKLINE, 1);
+ return 0;
+}
+
+void _thread_exit_signal(int i){
+ pthread_exit(NULL);
}
void* handle_thread(void* _args){
struct thread_info* args = (struct thread_info*)_args;
lua_State* L = args->L;
- pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+ pthread_mutex_lock(&*args->lock);
+
+ signal(SIGUSR1, _thread_exit_signal);
+
pthread_detach(args->tid);
+ //unlock main
+ pthread_mutex_lock(&*args->ready_lock);
+ pthread_cond_signal(&*args->cond);
+ pthread_mutex_unlock(&*args->ready_lock);
lua_newtable(L);
int res_idx = lua_gettop(L);
+ luaI_tsetcf(L, res_idx, "testclose", _res_testclose);
+ luaI_tsetcf(L, res_idx, "autoclose", _res_autoclose);
luaI_tsetlud(L, res_idx, "_", args);
lua_newtable(L);
@@ -191,10 +177,19 @@ void* handle_thread(void* _args){
lua_assign_upvalues(L, x);
lua_pushvalue(L, res_idx);
- lua_call(L, 1, 0);
-
+ if(lua_pcall(L, 1, 0, 0) != LUA_OK){
+ if(!(lua_type(L, -1) == LUA_TSTRING && strcmp("res():exit", lua_tostring(L, -1)) == 0)){
+ lua_error(L);
+ }
+ lua_pop(L, 1);
+ }
+ args->done = 1;
pthread_mutex_unlock(&*args->lock);
+ pthread_mutex_lock(&*args->close_lock);
+ pthread_cond_signal(&*args->cond);
+ pthread_mutex_unlock(&*args->close_lock);
+
return NULL;
}
@@ -202,20 +197,36 @@ int _thread_await(lua_State* L){
lua_pushstring(L, "_");
lua_gettable(L, 1);
struct thread_info* info = lua_touserdata(L, -1);
-
if(info->L == NULL) luaI_error(L, -1, "thread was already closed")
- if(!info->done) pthread_mutex_lock(&*info->lock);
- info->done = 1;
+ if(info->tid == 0) luaI_error(L, -2, "thread was killed early")
+
+ pthread_mutex_lock(&*info->lock);
+
+ if(info->return_count == 0) return 0;
+ lua_getglobal(L, "_locals");
+ int old_locals = lua_gettop(L);
+
+ lua_getglobal(info->L, "_stored_locals");
+ lua_getglobal(info->L, "_res_locals");
+ luaI_jointable(info->L);
+ luaI_deepcopy(info->L, L, SKIP_LOCALS | STRIP_GC);
+ //env_table(L, 0);
+
+ lua_setglobal(L, "_locals");
- for(int i = 0; i != info->return_count; i++){
- int ot = lua_gettop(info->L);
+ lua_getglobal(info->L, "_return_table");
+ int idx = lua_gettop(info->L);
- lua_pushvalue(info->L, ot - info->return_count + i);
- luaI_deepcopy(info->L, L, 0);
-
- lua_settop(info->L, ot);
+ for(int i = info->return_count; i != 0; i--){
+ lua_pushinteger(info->L, i);
+ lua_gettable(info->L, idx);
+ luaI_deepcopy(info->L, L, STRIP_GC);
}
+ lua_pushvalue(L, old_locals);
+ lua_setglobal(L, "_locals");
+ pthread_mutex_unlock(&*info->lock);
+
return info->return_count;
}
@@ -224,19 +235,45 @@ int _thread_clean(lua_State* L){
lua_gettable(L, 1);
struct thread_info* info = lua_touserdata(L, -1);
if(info != NULL && info->L != NULL){
-
+ luaI_tsetnil(L, 1, "_");
+
+ if(info->tid != 0 && !info->done){
+ pthread_kill(info->tid, SIGUSR1);
+ }
+
//lua_gc(info->L, LUA_GCRESTART);
lua_gc(info->L, LUA_GCCOLLECT);
+
lua_close(info->L);
+
info->L = NULL;
-
+
pthread_mutex_destroy(&*info->lock);
free(info->lock);
- pthread_cancel(info->tid);
+ pthread_mutex_destroy(&*info->close_lock);
+ free(info->close_lock);
+
+ pthread_cond_destroy(&*info->cond);
+ free(info->cond);
+
free(info);
+ }
+ return 0;
+}
- luaI_tsetlud(L, 1, "_", NULL);
+int _thread_kill(lua_State* L){
+ lua_pushstring(L, "_");
+ lua_gettable(L, 1);
+ struct thread_info* info = lua_touserdata(L, -1);
+
+ if(info->tid != 0){
+ pthread_kill(info->tid, SIGUSR1);
+ pthread_mutex_lock(&*info->close_lock);
+ pthread_cond_signal(&*info->cond);
+ pthread_mutex_unlock(&*info->close_lock);
}
+ info->tid = 0;
+
return 0;
}
@@ -245,7 +282,16 @@ int _thread_close(lua_State* L){
lua_gettable(L, 1);
struct thread_info* info = lua_touserdata(L, -1);
- pthread_cancel(info->tid);
+ if(info->tid == 0) return 0;
+
+ pthread_mutex_lock(&*info->close_lock);
+
+ info->request_close = 1;
+
+ pthread_cond_wait(&*info->cond, &*info->close_lock);
+ pthread_mutex_unlock(&*info->close_lock);
+
+ info->tid = 0;
return 0;
}
@@ -256,24 +302,43 @@ int l_async(lua_State* oL){
luaL_openlibs(L);
luaI_copyvars(oL, L);
-
+ luaL_openlibs(L);
+
+ lua_getglobal(L, "_locals");
+ lua_setglobal(L, "_stored_locals");
+
struct thread_info* args = calloc(1, sizeof * args);
args->L = L;
args->lock = malloc(sizeof * args->lock);
pthread_mutex_init(&*args->lock, NULL);
- pthread_mutex_lock(&*args->lock);
+ args->ready_lock = malloc(sizeof * args->ready_lock);
+ pthread_mutex_init(&*args->ready_lock, NULL);
+ args->close_lock = malloc(sizeof * args->close_lock);
+ pthread_mutex_init(&*args->close_lock, NULL);
+
args->return_count = 0;
-
+ args->cond = malloc(sizeof * args->cond);
+ pthread_cond_init(&*args->cond, NULL);
+
args->function = str_init("");
lua_pushvalue(oL, 1);
lua_dump(oL, writer, (void*)args->function, 0);
-
+
+ pthread_mutex_lock(&*args->ready_lock);
+
pthread_create(&args->tid, NULL, handle_thread, (void*)args);
+ pthread_cond_wait(&*args->cond, &*args->ready_lock);
+ pthread_mutex_unlock(&*args->ready_lock);
+
+ pthread_mutex_destroy(&*args->ready_lock);
+ free(args->ready_lock);
+
lua_newtable(oL);
int res_idx = lua_gettop(oL);
luaI_tsetcf(oL, res_idx, "await", _thread_await);
luaI_tsetcf(oL, res_idx, "clean", _thread_clean);
+ luaI_tsetcf(oL, res_idx, "kill", _thread_kill);
luaI_tsetcf(oL, res_idx, "close", _thread_close);
luaI_tsetlud(oL, res_idx, "_", args);
@@ -295,7 +360,7 @@ struct thread_buffer {
int _buffer_get(lua_State* L){
struct thread_buffer *buffer = lua_touserdata(L, 1);
pthread_mutex_lock(&*buffer->lock);
- luaI_deepcopy(buffer->L, L, SKIP_GC);
+ luaI_deepcopy(buffer->L, L, SKIP_GC | SKIP_LOCALS);
pthread_mutex_unlock(&*buffer->lock);
return 1;
}
@@ -304,33 +369,31 @@ int _buffer_set(lua_State* L){
struct thread_buffer *buffer = lua_touserdata(L, 1);
pthread_mutex_lock(&*buffer->lock);
lua_settop(buffer->L, 0);
- luaI_deepcopy(L, buffer->L, SKIP_GC);
+ luaI_deepcopy(L, buffer->L, SKIP_LOCALS | STRIP_GC);
pthread_mutex_unlock(&*buffer->lock);
+
return 1;
}
-#include <assert.h>
-_Atomic int used = 0;
-
int _buffer_mod(lua_State* L){
struct thread_buffer *buffer = lua_touserdata(L, 1);
pthread_mutex_lock(&*buffer->lock);
- //printf("%p\n", &*buffer->lock);
- assert(used == 0);
- used = 1;
-
- luaI_deepcopy(buffer->L, L, SKIP_GC);
+
+ luaI_deepcopy(buffer->L, L, SKIP_GC | SKIP_LOCALS);
int item = lua_gettop(L);
lua_pushvalue(L, 2);
lua_pushvalue(L, item);
lua_call(L, 1, 1);
if(lua_type(L, -1) != LUA_TNIL){
+ int idx = lua_gettop(L);
lua_settop(buffer->L, 0);
- luaI_deepcopy(L, buffer->L, SKIP_GC);
- }
+ luaI_deepcopy(L, buffer->L, STRIP_GC | SKIP_LOCALS);
- used = 0;
+ lua_getmetatable(L, idx);
+ idx = lua_gettop(L);
+ luaI_tsetnil(L, idx, "__gc");
+ }
pthread_mutex_unlock(&*buffer->lock);
return 1;
@@ -344,6 +407,7 @@ int l_buffer_index(lua_State* L){
hash = fnv_1((uint8_t*)str, len, v_1);
+ //maybe strcmp after the hash has been verified?
switch(hash){
case 0xd8c8ad186b9ed323: //get
lua_pushcfunction(L, _buffer_get);
@@ -362,7 +426,7 @@ int l_buffer_index(lua_State* L){
return 1;
}
- luaI_deepcopy(buffer->L, L, SKIP_GC);
+ luaI_deepcopy(buffer->L, L, SKIP_GC | SKIP_LOCALS);
lua_pop(buffer->L, 1);
break;
}
@@ -374,41 +438,43 @@ int hi(lua_State* L){
return 0;
}
-//not thread safe yet
int meta_proxy(lua_State* L){
int argc = lua_gettop(L);
struct thread_buffer *buffer = lua_touserdata(L, 1);
+ pthread_mutex_lock(&*buffer->lock);
lua_getmetatable(buffer->L, 1);
lua_pushstring(buffer->L, lua_tostring(L, 2));
lua_gettable(buffer->L, 2);
-
+
lua_pushvalue(buffer->L, 1);
int count = 0;
for(int i = 4; i <= argc; i++){
count++;
lua_pushvalue(L, i);
- luaI_deepcopy(L, buffer->L, SKIP_GC);
+ luaI_deepcopy(L, buffer->L, SKIP_GC | SKIP_LOCALS);
}
//printf("%i\n",count);
lua_call(buffer->L, count + 1, 1);
- luaI_deepcopy(buffer->L, L, 0);
+ luaI_deepcopy(buffer->L, L, SKIP_LOCALS | STRIP_GC);
lua_pushnil(buffer->L);
lua_setmetatable(buffer->L, -2);
lua_settop(buffer->L, 1);
+ pthread_mutex_unlock(&*buffer->lock);
//printf("%p\n", lua_topointer(buffer->L, -1));
return 1;
}
+#warning "make this reapply for new objects!"
void meta_proxy_gen(lua_State* L, struct thread_buffer *buffer, int meta_idx, int new_meta_idx){
lua_pushcfunction(L, meta_proxy);
lua_setglobal(L, "__proxy_call");
-
+
lua_pushlightuserdata(L, buffer);
lua_setglobal(L, "__this_obj");
@@ -423,7 +489,7 @@ void meta_proxy_gen(lua_State* L, struct thread_buffer *buffer, int meta_idx, in
char* fn = calloc(128, sizeof * fn);
const char* key = lua_tostring(L, k);
sprintf(fn, "return function(...)\
-return __proxy_call(__this_obj,'%s',...);end", key);
+ return __proxy_call(__this_obj,'%s',...);end", key);
luaL_dostring(L, fn);
free(fn);
@@ -438,6 +504,9 @@ return __proxy_call(__this_obj,'%s',...);end", key);
int l_buffer_gc(lua_State* L){
struct thread_buffer *buffer = lua_touserdata(L, 1);
pthread_mutex_lock(&*buffer->lock);
+ pthread_mutex_unlock(&*buffer->lock);
+ //race condition here, if something can manage to lock the thread between these two lines
+ //add maybe a closing variable thats checked for
pthread_mutex_destroy(&*buffer->lock);
free(buffer->lock);
@@ -453,10 +522,11 @@ int l_buffer(lua_State* L){
int buffer_idx = lua_gettop(L);
buffer->L = luaL_newstate();
+ lua_gc(buffer->L, LUA_GCSTOP);
buffer->lock = malloc(sizeof * buffer->lock);
if(pthread_mutex_init(&*buffer->lock, NULL) != 0) p_fatal("pthread_mutex_init failed");
lua_pushvalue(L, 1);
- luaI_deepcopy(L, buffer->L, SKIP_GC);
+ luaI_deepcopy(L, buffer->L, SKIP_LOCALS | STRIP_GC);
lua_newtable(L);
int meta_idx = lua_gettop(L);
@@ -464,14 +534,30 @@ int l_buffer(lua_State* L){
luaI_tsetcf(L, meta_idx, "__index", l_buffer_index);
luaI_tsetcf(L, meta_idx, "__gc", l_buffer_gc);
+ if(use != 0){
+ lua_getmetatable(L, 1);
+ int idx = lua_gettop(L);
+ luaI_tsetnil(L, idx, "__gc");
+ }
+
lua_pushvalue(L, meta_idx);
lua_setmetatable(L, buffer_idx);
lua_pushvalue(L, buffer_idx);
return 1;
}
-void _lua_getfenv(lua_State* L){
-
+int l_usleep(lua_State* L){
+ uint64_t n = lua_tonumber(L, 1);
+ usleep(n);
+
+ return 0;
+}
+
+int l_sleep(lua_State* L){
+ double n = lua_tonumber(L, 1);
+ usleep(n * 1000 * 1000);
+
+ return 0;
}
int l_testcopy(lua_State* L){