#include "pch.h"
#include "allegdb.h"
unsigned CALLBACK SQLQueueProc(void * pvSQLQueueThread)
{
CSQLQueueThread * pSQLQueueThread = (CSQLQueueThread *) pvSQLQueueThread;
HANDLE rgeventHandles[1];
rgeventHandles[0] = pSQLQueueThread->GetEventDie();
DWORD cHandles = sizeof(rgeventHandles) / sizeof(rgeventHandles[0]);
DWORD dwWait = WAIT_TIMEOUT;
do
{
dwWait = MsgWaitForMultipleObjects(cHandles, rgeventHandles, FALSE, INFINITE, QS_ALLINPUT);
static MSG msg;
while (WAIT_OBJECT_0 != dwWait && PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
{
TranslateMessage(&msg);
switch (msg.message)
{
case wm_sql_addquery:
{
pSQLQueueThread->AddQuery((CSQLQuery*) msg.lParam);
break;
}
case WM_QUIT:
dwWait = WAIT_OBJECT_0; break;
default:
ZError("SQLThreadProc: Unexpected thread message.\n");
}
}
} while (WAIT_OBJECT_0 != dwWait);
return 0;
}
unsigned CALLBACK SQLThreadProc(void * pvsqlThread)
{
CSQLThread * pSqlThread = (CSQLThread *) pvsqlThread;
CSQLCore * psql = pSqlThread->GetSQLCore();
HRESULT hr = E_FAIL;
hr = pSqlThread->Open();
if (FAILED(hr))
return hr;
HANDLE rgeventHandles[2];
rgeventHandles[0] = pSqlThread->GetEventDie();
rgeventHandles[1] = pSqlThread->GetNotify() ? psql->GetNotifySemaphore() : psql->GetSilentSemaphore();
DWORD cHandles = sizeof(rgeventHandles) / sizeof(rgeventHandles[0]);
DWORD dwWait = WAIT_TIMEOUT;
do
{
dwWait = WaitForMultipleObjects(cHandles, rgeventHandles, FALSE, INFINITE);
switch (dwWait)
{
case WAIT_OBJECT_0: break;
case WAIT_OBJECT_0 + 1: hr = pSqlThread->ServiceQuery(pSqlThread->GetNotify() ? psql->GetNotifyQuery() : psql->GetSilentQuery());
break;
default:
ZError("Unexpected object signaled in SQLThreadProc.\n");
}
} while (WAIT_OBJECT_0 != dwWait);
return 0;
}
HRESULT CSQLCore::Init(LPCOLESTR strSQLConfig, DWORD nThreadIDNotify, DWORD cSilentThreads,
DWORD cNotifyThreads)
{
HRESULT hr = E_FAIL;
if (cSilentThreads + cNotifyThreads < 1 ||
cSilentThreads + cNotifyThreads > 20) return E_INVALIDARG;
m_nThreadIDNotify = nThreadIDNotify;
m_cSilentThreads = cSilentThreads;
m_cNotifyThreads = cNotifyThreads;
m_hKillSQLEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
assert(m_hKillSQLEvent);
hr = m_connection.OpenFromInitializationString(strSQLConfig);
if (FAILED(hr))
{
DumpErrorInfo(m_connection.m_spInit, IID_IDBInitialize, NULL);
return hr;
}
m_hQueryNotify = CreateSemaphore(NULL, 0, MAXLONG, NULL);
m_hQuerySilent = CreateSemaphore(NULL, 0, MAXLONG, NULL);
m_pthdQueue = new CSQLQueueThread(m_hKillSQLEvent, this);
assert (m_pthdQueue);
m_pargSilentThreads = (CSQLThread **) new char[sizeof(CSQLThread *) * m_cSilentThreads];
m_pargNotifyThreads = (CSQLThread **) new char[sizeof(CSQLThread *) * m_cNotifyThreads];
DWORD i = 0;
for (i = 0; i < m_cNotifyThreads; i++)
{
m_pargNotifyThreads[i] = new CSQLThread(m_hKillSQLEvent, true, this);
}
for (i = 0; i < m_cSilentThreads; i++)
{
m_pargSilentThreads[i] = new CSQLThread(m_hKillSQLEvent, false, this);
}
return hr;
}
CSQLCore::~CSQLCore()
{
DWORD i = 0;
for (i = 0; i < m_cNotifyThreads && m_pargNotifyThreads; i++)
{
delete m_pargNotifyThreads[i];
}
delete [] m_pargNotifyThreads;
for (i = 0; i < m_cSilentThreads && m_pargSilentThreads; i++)
{
delete m_pargSilentThreads[i];
}
delete [] m_pargSilentThreads;
CloseHandle(m_hQueryNotify);
CloseHandle(m_hQuerySilent);
}
void CSQLCore::AddQuery(CSQLQuery * pquery)
{
HANDLE h = NULL;
if (pquery->GetNotify())
{
m_listQueriesNotify.last(pquery);
h = GetNotifySemaphore();
}
else
{
m_listQueriesSilent.last(pquery);
h = GetSilentSemaphore();
}
ReleaseSemaphore(h, 1, NULL); }
void CSQLCore::PostQuery(CSQLQuery * pquery)
{
PostThreadMessage(m_pthdQueue->GetThreadID(), wm_sql_addquery, (WPARAM) NULL, (LPARAM) pquery);
}
HRESULT CSQLThread::ServiceQuery(CSQLQuery * pqueryNew)
{
CSQLQuery * pqueryCache = NULL;
REFGUID guid = pqueryNew->GetGuid();
HRESULT hrQuery = E_FAIL; for (SLinkQueries * plinkQuery = m_listQueries.first(); plinkQuery && !pqueryCache; plinkQuery = plinkQuery->next())
{
CSQLQuery * pqueryT = plinkQuery->data();
if (pqueryT->GetGuid() == guid)
{
assert(!lstrcmp(pqueryNew->GetStrQuery(), pqueryT->GetStrQuery()));
pqueryCache = pqueryT;
}
}
if (!pqueryCache) {
pqueryCache = pqueryNew->Copy(NULL);
m_listQueries.first(pqueryCache);
hrQuery = pqueryCache->OnPrepare(m_session);
}
else
{
hrQuery = S_OK;
pqueryNew->Copy(pqueryCache);
}
bool fRetry = SUCCEEDED(hrQuery);
int cRetries = 0;
while (fRetry)
{
hrQuery = pqueryCache->OnExecute();
if (SUCCEEDED(hrQuery))
{
fRetry = false;
}
else
{
m_psql->DumpErrorInfo(pqueryCache->GetIUnknown(), IID_ICommand, &fRetry);
if (fRetry)
{
debugf("Query deadlocked or timed-out. Retry #%d\n", ++cRetries);
Sleep(50 * cRetries);
}
}
}
if (FAILED(hrQuery))
m_psql->DumpErrorInfo(pqueryCache->GetIUnknown(), IID_ICommand, NULL);
pqueryCache->Copy(pqueryNew);
if (pqueryNew->GetCallbackOnMainThread())
{
BOOL bPosted = PostThreadMessage(GetSQLCore()->GetNotifyThreadID(), wm_sql_querydone, (WPARAM) NULL, (LPARAM) pqueryNew);
if (!bPosted)
{
DWORD dwLastError = ::GetLastError();
if (ERROR_INVALID_THREAD_ID == dwLastError)
{
pqueryNew->DataReady();
}
}
}
else
{
pqueryNew->DataReady();
}
return hrQuery;
}