Невероятно параллельное обновление БД с использованием Python (PostGIS / PostgreSQL)

Мне нужно обновить каждую запись в пространственной базе данных, в которой у меня есть набор данных точек, которые перекрывают набор данных полигонов. Для каждого точечного объекта я хочу назначить ключ, чтобы связать его с полигональным объектом, внутри которого он находится. Итак, если моя точка «Нью-Йорк» находится в пределах многоугольника США, а для многоугольника США «GID = 1» я назначу «gid_fkey = 1» для моей точки Нью-Йорк.

Для этого я создал следующий запрос.

procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID)

В настоящее время я получаю информацию cityID из другого запроса, который просто выбирает все cityID, где gid_fkey имеет значение NULL. По сути, мне просто нужно перебрать их и выполнить показанный ранее запрос. Поскольку запрос полагается только на статическую информацию из другой таблицы, теоретически все эти процессы могут выполняться одновременно. Я реализовал описанную ниже процедуру многопоточности, но, похоже, не могу перейти на многопроцессорную обработку.

import psycopg2, pprint, threading, time, Queue

queue = Queue.Queue()
pyConn = psycopg2.connect("dbname='geobase_1' host='localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()

getGID = 'SELECT cityID FROM city'
pyCursor1.execute(getGID)
gidList = pyCursor1.fetchall()

class threadClass(threading.Thread):

def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

def run(self):

        while True:
            gid = self.queue.get()

            procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID)

            pyCursor2 = pyConn.cursor()                         
            pyCursor2.execute(procQuery)

            print gid[0]                    
            print 'Done'

def main():

    for i in range(4):
        t = threadClass(queue)
        t.setDaemon(True)
        t.start()

        for gid in gidList:
            queue.put(gid)

    queue.join()

main()

Я даже не уверен, оптимальна ли многопоточность, но она определенно быстрее, чем поочередно.

Машина, которую я буду использовать, имеет четыре ядра (Quad Core) и минимальную ОС Linux без графического интерфейса, PostgreSQL, PostGIS и Python, если это имеет значение.

Что мне нужно изменить, чтобы включить эту до боли простую многопроцессорную задачу?


person EnE_    schedule 20.09.2011    source источник


Ответы (2)


Хорошо, это ответ на мой собственный пост. Молодец, я = D

Повышает скорость моей системы примерно на 150% при переходе от одноядерного потока к четырехъядерному многопроцессорному процессу.

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
    multiprocessing.Process.__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue

def run(self):
    proc_name = self.name
    while True:
        next_task = self.task_queue.get()
        if next_task is None:
            print 'Tasks Complete'
            self.task_queue.task_done()
            break            
        answer = next_task()
        self.task_queue.task_done()
        self.result_queue.put(answer)
    return


class Task(object):
def __init__(self, a):
    self.a = a

def __call__(self):        
    pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConn.set_isolation_level(0)
    pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

    pyCursor1.execute(procQuery)
    print 'What is self?'
    print self.a

    return self.a

def __str__(self):
    return 'ARC'
def run(self):
    print 'IN'

if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()

num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
    w.start()

pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()

pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')    
temp = pyCursorX.fetchall()    
num_job = temp[0]
num_jobs = num_job[0]

pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')    
cityIdListTuple = pyCursorX.fetchall()    

cityIdList = []

for x in cityIdListTuple:
    cityIdList.append(x[0])


for i in xrange(num_jobs):
    tasks.put(Task(cityIdList[i - 1]))

for i in xrange(num_consumers):
    tasks.put(None)

while num_jobs:
    result = results.get()
    print result
    num_jobs -= 1

Теперь у меня есть еще один вопрос, который я разместил здесь:

Создание соединения с БД и поддержка нескольких процессов (многопроцессорность)

Надеюсь, мы сможем избавиться от некоторых накладных расходов и еще больше ускорить этот процесс.

person EnE_    schedule 26.09.2011
comment
эй @ene, если это решило ваш вопрос, рекомендуется пометить его как ответ :) - person eberbis; 08.03.2016
comment
Да, это странно, потому что я был всего лишь гостевым пользователем или кем-то еще, когда я разместил этот вопрос, у меня нет возможности отметить свой вопрос как правильный. Вы можете видеть, что изображение эскиза не обновилось вместе с моим. Предложения по разрешению приветствуются. - person EnE_; 09.03.2016
comment
о да ... проблема в том, что вы разместили свой вопрос от имени другого (незарегистрированного) пользователя (stackoverflow.com/users/954992/ene), и теперь вы используете зарегистрированный (stackoverflow.com/users/965035/ene) для ответа. Как видите, идентификаторы на них разные. Это может помочь: meta.stackexchange.com/questions / 74024 / - person eberbis; 09.03.2016
comment
Никогда не запускайте операцию ввода-вывода с использованием процесса Mutli, всегда используйте Async или Multi threading - person Mrinal Kamboj; 26.03.2021

В простом SQL можно было сделать что-то вроде:

UPDATE city ci
SET gid_fkey = co.gid 
FROM country co 
WHERE ST_within(ci.the_geom , co.the_geom) 
AND ci.city_id = _some_parameter_
        ;

Может возникнуть проблема, если город будет соответствовать нескольким странам (вызывая несколько обновлений одной и той же целевой строки), но это, вероятно, не относится к вашим данным.

person joop    schedule 08.06.2015