File: //home/ubuntu/neovim/.deps/build/src/luv/src/work.c
/*
* Copyright 2014 The Luvit Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "private.h"
typedef struct {
lua_State* L; /* vm in main */
char* code; /* thread entry code */
size_t len;
int after_work_cb; /* ref, run in main ,call after work cb*/
} luv_work_ctx_t;
typedef struct {
uv_work_t work;
luv_work_ctx_t* ctx;
luv_thread_arg_t args;
luv_thread_arg_t rets;
int ref; /* ref to luv_work_ctx_t, which create a new uv_work_t*/
} luv_work_t;
static uv_once_t once_vmkey = UV_ONCE_INIT;
static uv_key_t tls_vmkey; /* thread local storage key for Lua state */
static uv_mutex_t vm_mutex;
static unsigned int idx_vms = 0;
static unsigned int nvms = 0;
static lua_State** vms;
static lua_State* default_vms[4];
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0]))
#endif
#if LUV_UV_VERSION_GEQ(1, 30, 0)
#define MAX_THREADPOOL_SIZE 1024
#else
#define MAX_THREADPOOL_SIZE 128
#endif
static luv_work_ctx_t* luv_check_work_ctx(lua_State* L, int index) {
luv_work_ctx_t* ctx = (luv_work_ctx_t*)luaL_checkudata(L, index, "luv_work_ctx");
return ctx;
}
static int luv_work_ctx_gc(lua_State *L) {
luv_work_ctx_t* ctx = luv_check_work_ctx(L, 1);
free(ctx->code);
luaL_unref(L, LUA_REGISTRYINDEX, ctx->after_work_cb);
return 0;
}
static int luv_work_ctx_tostring(lua_State* L) {
luv_work_ctx_t* ctx = luv_check_work_ctx(L, 1);
lua_pushfstring(L, "luv_work_ctx_t: %p", ctx);
return 1;
}
static int luv_work_cb(lua_State* L) {
uv_work_t* req = lua_touserdata(L, 1);
luv_work_t* work = (luv_work_t*)req->data;
luv_work_ctx_t* ctx = work->ctx;
luv_ctx_t *lctx = luv_context(L);
lua_pop(L, 1);
int top = lua_gettop(L);
/* push lua function */
lua_pushlstring(L, ctx->code, ctx->len);
lua_rawget(L, LUA_REGISTRYINDEX);
if (lua_isnil(L, -1)) {
lua_pop(L, 1);
lua_pushlstring(L, ctx->code, ctx->len);
if (luaL_loadbuffer(L, ctx->code, ctx->len, "=pool") != 0)
{
fprintf(stderr, "Uncaught Error in work callback: %s\n", lua_tostring(L, -1));
lua_pop(L, 2);
lua_pushnil(L);
} else {
lua_pushvalue(L, -1);
lua_insert(L, lua_gettop(L) - 2);
lua_rawset(L, LUA_REGISTRYINDEX);
}
}
if (lua_isfunction(L, -1)) {
int i = luv_thread_arg_push(L, &work->args, LUVF_THREAD_SIDE_CHILD);
// If exit is called on a thread in the thread pool, abort is called in
// uv__threadpool_cleanup, so exit is not called in luv_cfpcall.
i = lctx->thrd_pcall(L, i, LUA_MULTRET, LUVF_CALLBACK_NOEXIT);
if ( i>=0 ) {
//clear in main threads, luv_after_work_cb
i = luv_thread_arg_set(L, &work->rets, top + 1, lua_gettop(L),
LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
if (i < 0) {
return luv_thread_arg_error(L);
}
lua_pop(L, i); // pop all returned value
luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
}
luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD);
} else {
lua_pop(L, 1);
luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD);
return luaL_error(L, "Uncaught Error: %s can't be work entry\n",
lua_typename(L, lua_type(L,-1)));
}
if (top!=lua_gettop(L))
return luaL_error(L, "stack not balance in luv_work_cb, need %d but %d", top, lua_gettop(L));
return LUA_OK;
}
static lua_State* luv_work_acquire_vm(void)
{
lua_State* L = uv_key_get(&tls_vmkey);
if (L == NULL)
{
L = acquire_vm_cb();
uv_key_set(&tls_vmkey, L);
lua_pushboolean(L, 1);
lua_setglobal(L, "_THREAD");
uv_mutex_lock(&vm_mutex);
vms[idx_vms] = L;
idx_vms += 1;
uv_mutex_unlock(&vm_mutex);
}
return L;
}
static void luv_work_cb_wrapper(uv_work_t* req) {
luv_work_t* work = (luv_work_t*)req->data;
lua_State *L = luv_work_acquire_vm();
luv_ctx_t* lctx = luv_context(L);
// If exit is called on a thread in the thread pool, abort is called in
// uv__threadpool_cleanup, so exit is not called in luv_cfpcall.
int i = lctx->thrd_cpcall(L, luv_work_cb, (void*)req, LUVF_CALLBACK_NOEXIT);
if (i != LUA_OK) {
luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD);
}
}
static void luv_after_work_cb(uv_work_t* req, int status) {
luv_work_t* work = (luv_work_t*)req->data;
luv_work_ctx_t* ctx = work->ctx;
lua_State* L = ctx->L;
luv_ctx_t *lctx = luv_context(L);
int i;
(void)status;
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->after_work_cb);
i = luv_thread_arg_push(L, &work->rets, LUVF_THREAD_SIDE_MAIN);
lctx->cb_pcall(L, i, 0, 0);
//ref down to ctx, up in luv_queue_work()
luaL_unref(L, LUA_REGISTRYINDEX, work->ref);
work->ref = LUA_NOREF;
luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_MAIN);
luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_MAIN);
free(work);
}
static int luv_new_work(lua_State* L) {
size_t len;
char* code;
luv_work_ctx_t* ctx;
luv_thread_dumped(L, 1);
len = lua_rawlen(L, -1);
code = malloc(len);
memcpy(code, lua_tostring(L, -1), len);
lua_pop(L, 1);
luaL_checktype(L, 2, LUA_TFUNCTION);
ctx = (luv_work_ctx_t*)lua_newuserdata(L, sizeof(*ctx));
memset(ctx, 0, sizeof(*ctx));
ctx->len = len;
ctx->code = code;
lua_pushvalue(L, 2);
ctx->after_work_cb = luaL_ref(L, LUA_REGISTRYINDEX);
ctx->L = luv_state(L);
luaL_getmetatable(L, "luv_work_ctx");
lua_setmetatable(L, -2);
return 1;
}
static int luv_queue_work(lua_State* L) {
int top = lua_gettop(L);
luv_work_ctx_t* ctx = luv_check_work_ctx(L, 1);
luv_work_t* work = (luv_work_t*)malloc(sizeof(*work));
int ret;
memset(work, 0, sizeof(*work));
ret = luv_thread_arg_set(L, &work->args, 2, top, LUVF_THREAD_SIDE_MAIN); //clear in sub threads,luv_work_cb
if (ret < 0) {
luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_MAIN);
free(work);
return luv_thread_arg_error(L);
}
work->ctx = ctx;
work->work.data = work;
ret = uv_queue_work(luv_loop(L), &work->work, luv_work_cb_wrapper, luv_after_work_cb);
if (ret < 0) {
luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_MAIN);
free(work);
return luv_error(L, ret);
}
//ref up to ctx
lua_pushvalue(L, 1);
work->ref = luaL_ref(L, LUA_REGISTRYINDEX);
lua_pushboolean(L, 1);
return 1;
}
static const luaL_Reg luv_work_ctx_methods[] = {
{"queue", luv_queue_work},
{NULL, NULL}
};
static void luv_key_init_once(void)
{
const char* val;
int status = uv_key_create(&tls_vmkey);
if (status != 0)
{
fprintf(stderr, "*** threadpool not works\n");
fprintf(stderr, "Error to uv_key_create with %s: %s\n",
uv_err_name(status), uv_strerror(status));
abort();
}
status = uv_mutex_init(&vm_mutex);
if (status != 0)
{
fprintf(stderr, "*** threadpool not works\n");
fprintf(stderr, "Error to uv_mutex_init with %s: %s\n",
uv_err_name(status), uv_strerror(status));
abort();
}
/* ref to https://github.com/libuv/libuv/blob/v1.x/src/threadpool.c init_threads */
nvms = ARRAY_SIZE(default_vms);
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nvms = atoi(val);
if (nvms == 0)
nvms = 1;
if (nvms > MAX_THREADPOOL_SIZE)
nvms = MAX_THREADPOOL_SIZE;
vms = default_vms;
if (nvms > ARRAY_SIZE(default_vms)) {
vms = malloc(nvms * sizeof(vms[0]));
if (vms == NULL) {
nvms = ARRAY_SIZE(default_vms);
vms = default_vms;
}
memset(vms, 0, sizeof(vms[0]) * nvms);
}
idx_vms = 0;
}
static void luv_work_cleanup(void)
{
unsigned int i;
if (nvms == 0)
return;
for (i = 0; i < nvms && vms[i]; i++)
release_vm_cb(vms[i]);
if (vms != default_vms)
free(vms);
uv_mutex_destroy(&vm_mutex);
nvms = 0;
}
static void luv_work_init(lua_State* L) {
luaL_newmetatable(L, "luv_work_ctx");
lua_pushcfunction(L, luv_work_ctx_tostring);
lua_setfield(L, -2, "__tostring");
lua_pushcfunction(L, luv_work_ctx_gc);
lua_setfield(L, -2, "__gc");
luaL_newlib(L, luv_work_ctx_methods);
lua_setfield(L, -2, "__index");
lua_pop(L, 1);
uv_once(&once_vmkey, luv_key_init_once);
}