20. 다중 상속
21. 추상클래스
22. 예외처리
23. file i/o
24. wxPython
25. SQLite
26. 원격 DB 연동
27. socket
28. Thread
29. 멀티 쓰레드 채팅 프로그램
30. pool
31. process
32. 웹 크롤링
33. HttpServer
34. CGI
35. 챗봇(chatbot)
20. 다중 상속 - 순서 중요
* test_24
print("이전 작업 진행")
class Tiger:
data = '호랑이 세상'
def cry(self):
print('호랑이 울음')
def eat(self):
print('고기 먹음')
class Lion:
data = '사자 세상'
def cry(self):
print('사자 울음')
def hobby(self):
print('사자 낮잠')
class Liger(Tiger, Lion): # 먼저 명시한 부모를 먼저 인식
pass
aa = Liger()
aa.cry() # 호랑이 울음
aa.eat() # 고기 먹음
aa.hobby() # 사자 낮잠
print(aa.data) # 호랑이 세상
print()
class Liger2(Lion, Tiger):
data = 'Liger2 멤버 변수'
def play(self):
print("Liger2 고유 메소드")
self.hobby()
super().hobby()
print(self.data)
print(super().data)
bb = Liger2()
bb.cry() # 사자 울음
bb.eat() # 고기 먹음
bb.hobby() # 사자 낮잠
print(bb.data) # Liger2 멤버 변수
bb.play()
# Liger2 고유 메소드
# 사자 낮잠
# 사자 낮잠
# Liger2 멤버 변수
# 사자 세상
class Animal:
def move(self):
pass
class Dog(Animal):
name = '개'
def move(self):
print('개가 낮에 돌아다님')
class Cat(Animal):
name = '고양이'
def move(self):
print('고양이가 밤에 돌아다님')
class Wolf(Dog, Cat):
pass
class Fox(Cat, Dog):
def move(self):
print('여우가 돌아다님(오버라이딩)')
def foxMove(self):
print('여우 고유 메소드')
dog = Dog()
cat = Cat()
wolf = Wolf()
fox = Fox()
anis = [dog, cat, wolf, fox]
for a in anis:
a.move()
# 개가 낮에 돌아다님
# 고양이가 밤에 돌아다님
# 개가 낮에 돌아다님
# 여우가 돌아다님(오버라이딩)
21. 추상클래스
추상클래스 : 추상 메소드를 가지는 클래스.
추상 메소드: 자식 클래스에서 부모의 추상메소드를 일반 메소드로 반드시 오버라이딩 하도록 강요.
* test25_abstract
from abc import *
class TestClass(metaclass = ABCMeta): # 추상 클래스
@abstractmethod
def abcMethod(self): # 추상 메소드
pass
def normalMethod(self): # 일반 메소드
print('일반 메소드')
tt = TestClass() # TypeError: Can't instantiate abstract class TestClass with abstract methods abcMethod
# 추상클래스가 추상메소드를 가질 경우 인스턴스 생성 시 error 발생.
# 추상클래스가 추상메소드를 가지지 않을 경우 인스턴스 생성 가능.
class Child1(TestClass): # 추상 클래스를 상속 받을 경우 추상메소드를 오버라이드 하지않을 경우 error 발생
name = 'Child1'
def abcMethod(self):
print('Child1 추상 메소드를 오버라이딩하여 일반 메소드 생성')
# TypeError: Can't instantiate abstract class Child1 with abstract methods abcMethod
c1 = Child1()
c1.abcMethod() # Child1 추상 메소드를 오버라이딩하여 일반 메소드 생성
c1.normalMethod() # 일반 메소드
class Child2(TestClass):
name = 'Child2'
def abcMethod(self):
print('Child2 추상 메소드를 오버라이딩하여 일반 메소드 생성')
print('다른 클래스의 abcMethod와 이름은 같으나 다른 기능을 수행.')
def normalMethod(self):
print('부모의 normalMethod를 오버라이딩. 부모 메소드와 다른 역할 수행을 위해.')
c2 = Child2()
c2.abcMethod() # Child2 추상 메소드를 오버라이딩하여 일반 메소드 생성
# 다른 클래스의 abcMethod와 이름은 같으나 다른 기능을 수행.
c2.normalMethod() # 부모의 normalMethod를 오버라이딩. 부모 메소드와 다른 역할 수행을 위해.
print()
다형성 구현
my = c1
my.abcMethod() # 1
#Child1 추상 메소드를 오버라이딩하여 일반 메소드 생성
my = c2
my.abcMethod() # 1과 동일한 명령문이나 기능은 다름.
#Child2 추상 메소드를 오버라이딩하여 일반 메소드 생성
#다른 클래스의 abcMethod와 이름은 같으나 다른 기능을 수행.
클래스 연습문제 24-4
class Employee(metaclass= ABCMeta):
def __init__(self, irum, nai):
self.irum = irum
self.nai = nai
@abstractmethod
def pay(self):
pass
@abstractmethod
def data_print(self):
pass
def irumnai_print(self):
print('이름 : ' + self.irum + ', 나이 : '+ str(self.nai), end = ', ')
class Temporary(Employee):
def __init__(self, irum, nai, ilsu, ildang):
Employee.__init__(self, irum, nai)
self.ilsu = ilsu
self.ildang = ildang
def pay(self):
return self.ilsu * self.ildang;
def data_print(self):
self.irumnai_print()
print('월급 : ' + str(self.pay()))
t = Temporary('홍길도', 25, 20, 15000)
t.data_print() # 이름 : 홍길도, 나이 : 25, 월급 : 300000
class Regular(Employee):
def __init__(self, irum, nai, salary):
Employee.__init__(self, irum, nai)
self.salary = salary
def pay(self): # 추상 메소드 오버라이딩
return self.salary;
def data_print(self):
self.irumnai_print()
print('급여 : ' + str(self.pay()))
r = Regular('한국인', 27, 3500000) # 이름 : 한국인, 나이 : 27, 급여 : 3500000
r.data_print()
class Salseman(Regular):
def __init__(self, irum, nai, salary, sales, commission):
Regular.__init__(self, irum, nai, salary)
self.sales = sales
self.commission = commission
def pay(self): # 일반 메소드 오버라이딩
return super().pay() + self.sales * self.commission;
def data_print(self):
self.irumnai_print()
print('수령액 : ' + str(self.pay()))
s = Salseman('손오공', 29, 1200000, 5000000, 0.25)
s.data_print() # 이름 : 손오공, 나이 : 29, 수령액 : 2450000.0
22. 예외처리
* test26_try
try ~ except - 예상치 않은 에러에 대응하기 위한 문법
def divide(a, b):
return a / b
c = divide(5, 2)
#c = divide(5, 0) # ZeroDivisionError: division by zero
print(c)
print()
try:
c = divide(5, 2)
#c = divide(5, 0)
print(c)
aa = [1, 2]
print(aa[0])
print(aa[1])
#print(aa[2]) # IndexError: list index out of range
f = open('c:/abc.txt') # FileNotFoundError: [Errno 2] No such file or directory: 'c:/abc.txt'
except ZeroDivisionError:
print('두번째 인자에 0을 주지마세요')
except IndexError as e:
print('Index Error : ', e)
except Exception as err:
print("error 발생 : ", err)
finally:
print('에러 유무에 관계없이 반드시 수행됨')
print('프로그램 종료')
23. file i/o
import os
print(os.getcwd())
try:
print('파일 읽기')
f1 = open(r'ftest.txt', mode ='r', encoding ='utf-8')
print(f1) # <_io.TextIOWrapper name='ftest.txt' mode='r' encoding='utf-8'>
print(f1.read())
f1.close() # 파일 처리 종료시 close() : 사용했던 자원을 해제
print('파일로 저장')
f2 = open('ftest2.txt', mode = 'w', encoding = 'utf-8')
f2.write('내 친구들\n')
f2.write('홍길동 고길동\n')
f2.close()
print('저장 성공')
print('파일에 내용추가')
f2 = open('ftest2.txt', mode = 'a', encoding = 'utf-8')
f2.write('손오공\n')
f2.write('조조\n')
f2.close()
print('추가 성공')
print('파일 읽기')
f3 = open(r'ftest2.txt', mode ='r', encoding ='utf-8')
print(f3.read()) # 전체 행 읽기
#print(f3.readline()) # 한 행씩 읽기
f3.close()
except Exception as e:
print('에러 : ', e)
- with문 사용 - close() 자동 수행
try:
# 저장
with open('ftest3.txt', mode = 'w', encoding = 'utf-8') as ff1:
ff1.write('파이썬 문서 저장\n')
ff1.write('내용\n')
ff1.write('with문 사용으로 close() 하지않음\n')
# 읽기
with open('ftest3.txt', mode = 'r', encoding = 'utf-8') as ff2:
print(ff2.read())
except Exception as e2:
print('에러 발생', e2)
- 피클링 : 복합 개체 처리(i/o)
import pickle
try:
dicData = {'tom':'111-1111', 'james':'222-2222'}
listData = ['마우스', '키보드']
tupleData = (dicData, listData) # 복합객체
with open('hi.dat', 'wb') as fobj:
pickle.dump(tupleData, fobj)
pickle.dump(listData, fobj)
print('객체를 파일로 저장하기 성공')
print('객체 읽기')
with open('hi.dat', 'rb') as fobj2:
a, b = pickle.load(fobj2)
print(a) # {'tom': '111-1111', 'james': '222-2222'}
print(b) # ['마우스', '키보드']
c = pickle.load(fobj2)
print(c)
except Exception as e3:
print('error : ' + e3)
* test28_file
- 동 이름으로 우편번호와 주소 출력
try:
dong = input('동이름 입력 : ')
print(dong)
with open(r'zipcode.txt', mode = 'r', encoding = 'euc-kr') as f:
#print(f.read())
line = f.readline()
while line:
lines = line.split('\t') # tab으로 구분
#print(lines)
if lines[3].startswith(dong): # 입력된 동 이름으로 시작되는 주소만 처리
print('[' + lines[0] + '] '+ lines[1] + ' '+ lines[2] + \
' '+ lines[3] + ' '+ lines[4])
line = f.readline() # readline이 없을 때 까지 수행
except Exception as e:
print('err',e)
24. wxPython
- 설치
1 사이트 접속
wxpython.org/Phoenix/snapshot-builds/
Index of /Phoenix/snapshot-builds
wxpython.org
2 " wxPython-4.1.2a1.dev5103+4ab028d7-cp38-cp38-win_amd64.whl " 다운로드
3 anaconda prompt 실행
" cd C:\Users\wonh\Downloads\ " 입력(다운로드 파일 경로)
" dir wx* " 입력 후 하기 파일명 복사
" pip install wxPython-4.1.2a1.dev5103+4ab028d7-cp38-cp38-win_amd64.whl "입력
* test29_gui1
# GUI (윈도우 프로그래밍)
# wxPython
import wx
# app = wx.App(False)
# frame = wx.Frame(None, wx.ID_ANY, 'Hi')
# frame.Show(True)
# app.MainLoop()
class Ex(wx.Frame):
def __init__(self, parent, title):
super(Ex, self).__init__(parent, title = title, size =(300, 300))
# textBox 추가
#self.txtA = wx.TextCtrl(self)
#self.txtA = wx.TextCtrl(self, style = wx.TE_MULTILINE)
# Label, textBox 사용
panel = wx.Panel(self)
wx.StaticText(panel, label = 'i d : ', pos = (5, 5))
wx.StaticText(panel, label = 'pwd : ', pos = (5, 40))
self.txtId = wx.TextCtrl(panel, pos = (40, 5))
self.txtPwd = wx.TextCtrl(panel, pos = (40, 40), size = (200, -1))
# Button : 디자인
btn1 = wx.Button(panel, label = '버튼1', pos = (5,100))
btn2 = wx.Button(panel, label = '버튼2', pos = (100,100))
btn3 = wx.Button(panel, label = '종료', pos = (200,100))
# Button에 대한 이벤트 처리 작업1
# btn1.Bind(wx.EVT_BUTTON, self.btn1Method)
# btn2.Bind(wx.EVT_BUTTON, self.btn2Method)
# btn3.Bind(wx.EVT_BUTTON, self.btn3Method)
# Button에 대한 이벤트 처리 작업2
btn1.id = 1 # 각 버튼의 구분을 위해 id를 준다
btn2.id = 2
btn3.id = 3
btn1.Bind(wx.EVT_BUTTON, self.OnClickMethod)
btn2.Bind(wx.EVT_BUTTON, self.OnClickMethod)
btn3.Bind(wx.EVT_BUTTON, self.OnClickMethod)
self.Center()
self.Show()
def abc(self):
print('일반 메소드')
def btn1Method(self, event): # 이벤트 처리 메소드. event handler
#print('버튼 1을 누르셨습니다')
temp = self.txtId.GetValue()
self.txtPwd.SetLabelText(temp)
def btn2Method(self, event): # 이벤트 처리 메소드. event handler
#print('버튼 2를 누르셨습니다')
msgDial = wx.MessageDialog(self, '메세지', '제목', wx.OK)
msgDial.ShowModal()
msgDial.Destroy()
def btn3Method(self, event): # 이벤트 처리 메소드. event handler
#print('종료')
self.Close() # wx.App()를 종료
def OnClickMethod(self, event):
#print(event.GetEventObject().id)
#print('aa')
if event.GetEventObject().id == 1:
self.txtId.SetLabelText('id');
elif event.GetEventObject().id == 2:
self.txtPwd.SetLabelText('password');
else:
self.Close()
if __name__ == '__main__':
app = wx.App()
Ex(None, title = '연습')
app.MainLoop()
* test30_gui2
# 레이아웃 관리자 클래스 중 Boxsizer 연습
import wx
class MyFrame(wx.Frame):
def __init__(self, parent, title):
super(MyFrame, self).__init__(parent, title=title, size =(300, 350))
panel1 = wx.Panel(self, -1, style = wx.SUNKEN_BORDER)
panel2 = wx.Panel(self, -1, style = wx.SUNKEN_BORDER)
panel1.SetBackgroundColour("Blue")
panel2.SetBackgroundColour("red")
box = wx.BoxSizer(wx.HORIZONTAL) # VERTICAL
box.Add(panel1, 1, wx.EXPAND)
box.Add(panel2, 2, wx.EXPAND)
self.SetSizer(box)
self.Center()
self.Show()
if __name__ == '__main__':
app = wx.App()
MyFrame(None, title = '레이아웃')
app.MainLoop()
- wxformbuilder - UI 편집 도구
* test31_gui3
sourceforge.net/projects/wxformbuilder/
wxFormBuilder
Download wxFormBuilder for free. ALL DEVELOPMENT MOVED TO GITHUB https://github.com/wxFormBuilder/wxFormBuilder wxFormBuilder - a RAD tool for wxWidgets GUI design.
sourceforge.net
import wx
import wx.xrc
class MyFrame1 ( wx.Frame ):
def __init__( self, parent ):
wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = u"연습", pos = wx.DefaultPosition, size = wx.Size( 288,154 ), style = wx.DEFAULT_FRAME_STYLE|wx.TAB_TRAVERSAL )
#self.SetSizeHintsSz( wx.DefaultSize, wx.DefaultSize )
self.SetBackgroundColour( wx.SystemSettings.GetColour( wx.SYS_COLOUR_MENU ) )
bSizer1 = wx.BoxSizer( wx.VERTICAL )
self.m_panel1 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer2 = wx.BoxSizer( wx.HORIZONTAL )
self.m_staticText1 = wx.StaticText( self.m_panel1, wx.ID_ANY, u"이름 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_staticText1.Wrap( -1 )
bSizer2.Add( self.m_staticText1, 0, wx.ALL, 5 )
self.txtName = wx.TextCtrl( self.m_panel1, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer2.Add( self.txtName, 0, wx.ALL, 5 )
self.m_panel1.SetSizer( bSizer2 )
self.m_panel1.Layout()
bSizer2.Fit( self.m_panel1 )
bSizer1.Add( self.m_panel1, 0, wx.ALL|wx.EXPAND, 5 )
self.m_panel2 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer3 = wx.BoxSizer( wx.HORIZONTAL )
self.m_staticText2 = wx.StaticText( self.m_panel2, wx.ID_ANY, u"나이 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_staticText2.Wrap( -1 )
bSizer3.Add( self.m_staticText2, 0, wx.ALL, 5 )
self.txtAge = wx.TextCtrl( self.m_panel2, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer3.Add( self.txtAge, 0, wx.ALL, 5 )
self.btnOk = wx.Button( self.m_panel2, wx.ID_ANY, u"확인", wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer3.Add( self.btnOk, 0, wx.ALL, 5 )
self.m_panel2.SetSizer( bSizer3 )
self.m_panel2.Layout()
bSizer3.Fit( self.m_panel2 )
bSizer1.Add( self.m_panel2, 0, wx.EXPAND |wx.ALL, 5 )
self.m_panel3 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer4 = wx.BoxSizer( wx.HORIZONTAL )
self.m_staticText3 = wx.StaticText( self.m_panel3, wx.ID_ANY, u"결과 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_staticText3.Wrap( -1 )
bSizer4.Add( self.m_staticText3, 0, wx.ALL, 5 )
self.staShow = wx.StaticText( self.m_panel3, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
self.staShow.Wrap( -1 )
bSizer4.Add( self.staShow, 0, wx.ALL, 5 )
self.m_panel3.SetSizer( bSizer4 )
self.m_panel3.Layout()
bSizer4.Fit( self.m_panel3 )
bSizer1.Add( self.m_panel3, 0, wx.EXPAND |wx.ALL, 5 )
self.SetSizer( bSizer1 )
self.Layout()
self.Centre( wx.BOTH )
# Connect Events
self.btnOk.Bind( wx.EVT_BUTTON, self.OnClickMethod )
def __del__( self ):
pass
# Virtual event handlers, overide them in your derived class
def OnClickMethod( self, event ):
#event.Skip()
name = self.txtName.GetValue()
if name == "":
wx.MessageBox('이름 입력', '알림', wx.OK)
return
age = self.txtAge.GetValue()
self.staShow.SetLabelText(name+" " + age)
if __name__ == '__main__':
app = wx.App()
MyFrame1(None).Show()
app.MainLoop()
* test32_gui_calc
import wx
import wx.xrc
class MyFrame1 ( wx.Frame ):
def __init__( self, parent ):
wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = wx.EmptyString, pos = wx.DefaultPosition, size = wx.Size( 308,303 ), style = wx.DEFAULT_FRAME_STYLE|wx.TAB_TRAVERSAL )
#self.SetSizeHintsSz( wx.DefaultSize, wx.DefaultSize )
self.SetBackgroundColour( wx.SystemSettings.GetColour( wx.SYS_COLOUR_MENU ) )
bSizer1 = wx.BoxSizer( wx.VERTICAL )
self.m_panel1 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer2 = wx.BoxSizer( wx.HORIZONTAL )
self.m_staticText1 = wx.StaticText( self.m_panel1, wx.ID_ANY, u"숫자1", wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_staticText1.Wrap( -1 )
bSizer2.Add( self.m_staticText1, 0, wx.ALL, 5 )
self.txtNum1 = wx.TextCtrl( self.m_panel1, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer2.Add( self.txtNum1, 0, wx.ALL, 5 )
self.m_panel1.SetSizer( bSizer2 )
self.m_panel1.Layout()
bSizer2.Fit( self.m_panel1 )
bSizer1.Add( self.m_panel1, 0, wx.EXPAND |wx.ALL, 5 )
self.m_panel2 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer3 = wx.BoxSizer( wx.HORIZONTAL )
self.m_staticText2 = wx.StaticText( self.m_panel2, wx.ID_ANY, u"숫자2", wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_staticText2.Wrap( -1 )
bSizer3.Add( self.m_staticText2, 0, wx.ALL, 5 )
self.txtNum2 = wx.TextCtrl( self.m_panel2, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer3.Add( self.txtNum2, 0, wx.ALL, 5 )
self.m_panel2.SetSizer( bSizer3 )
self.m_panel2.Layout()
bSizer3.Fit( self.m_panel2 )
bSizer1.Add( self.m_panel2, 0, wx.EXPAND |wx.ALL, 5 )
self.m_panel3 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer4 = wx.BoxSizer( wx.VERTICAL )
rdoOpChoices = [ u"+", u"-", u"*", u"/" ]
self.rdoOp = wx.RadioBox( self.m_panel3, wx.ID_ANY, u"연산자 선택", wx.DefaultPosition, wx.DefaultSize, rdoOpChoices, 1, wx.RA_SPECIFY_ROWS )
self.rdoOp.SetSelection( 2 )
bSizer4.Add( self.rdoOp, 0, wx.ALL|wx.EXPAND, 5 )
self.m_panel3.SetSizer( bSizer4 )
self.m_panel3.Layout()
bSizer4.Fit( self.m_panel3 )
bSizer1.Add( self.m_panel3, 0, wx.EXPAND |wx.ALL, 5 )
self.m_panel4 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer5 = wx.BoxSizer( wx.HORIZONTAL )
self.m_staticText3 = wx.StaticText( self.m_panel4, wx.ID_ANY, u"연산결과 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_staticText3.Wrap( -1 )
bSizer5.Add( self.m_staticText3, 0, wx.ALL, 5 )
self.staResult = wx.StaticText( self.m_panel4, wx.ID_ANY, u"0", wx.DefaultPosition, wx.DefaultSize, 0 )
self.staResult.Wrap( -1 )
bSizer5.Add( self.staResult, 0, wx.ALL, 5 )
self.m_panel4.SetSizer( bSizer5 )
self.m_panel4.Layout()
bSizer5.Fit( self.m_panel4 )
bSizer1.Add( self.m_panel4, 0, wx.EXPAND |wx.ALL, 5 )
self.m_panel5 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer6 = wx.BoxSizer( wx.HORIZONTAL )
self.btnCalc = wx.Button( self.m_panel5, wx.ID_ANY, u"계산", wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer6.Add( self.btnCalc, 0, wx.ALL, 5 )
self.btnClear = wx.Button( self.m_panel5, wx.ID_ANY, u"초기화", wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer6.Add( self.btnClear, 0, wx.ALL, 5 )
self.btnExit = wx.Button( self.m_panel5, wx.ID_ANY, u"종료", wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer6.Add( self.btnExit, 0, wx.ALL, 5 )
self.m_panel5.SetSizer( bSizer6 )
self.m_panel5.Layout()
bSizer6.Fit( self.m_panel5 )
bSizer1.Add( self.m_panel5, 1, wx.EXPAND |wx.ALL, 5 )
self.SetSizer( bSizer1 )
self.Layout()
self.Centre( wx.BOTH )
# Connect Events
self.btnCalc.id = 1 # id 부여
self.btnClear.id = 2
self.btnExit.id = 3
self.btnCalc.Bind( wx.EVT_BUTTON, self.OnCalcProcess )
self.btnClear.Bind( wx.EVT_BUTTON, self.OnCalcProcess )
self.btnExit.Bind( wx.EVT_BUTTON, self.OnCalcProcess )
def __del__( self ):
pass
# Virtual event handlers, overide them in your derived class
def OnCalcProcess( self, event ):
sel_id = event.GetEventObject().id # event 발생 객체의 id를 get
#print(sel_id)
if sel_id == 1: # 계산 버튼을 누를 경우
op = self.rdoOp.GetStringSelection() # 선택된 라디오 버튼의 문자열 get
#print(op)
num1 = self.txtNum1.GetValue()
num2 = self.txtNum2.GetValue()
if num1 == '' or num2 == '':
wx.MessageBox('값을 입력하시오', '알림', wx.OK)
return
try:
result = eval(num1 + op + num2)
except Exception as e:
wx.MessageBox('연산오류', '알림', wx.OK)
return
self.staResult.SetLabel(str(result))
elif sel_id == 2: # 초기화
self.txtNum1.SetLabel('')
self.txtNum2.SetLabel('')
self.staResult.SetLabel('')
self.rdoOp.SetSelection(0)
self.txtNum1.SetFocus() # UX/UI
elif sel_id == 3:
dlg = wx.MessageDialog(self, '정말 종료할까요?', '알림', wx.YES_NO) # MessageDialog style 입력
temp = dlg.ShowModal() # MessageDialog 창 발생
if temp == wx.ID_YES: # YES 버튼을 누를 경우
dlg.Destroy() # MessageDialog창 닫기
self.Close() # Frame 닫기
if __name__ == '__main__':
app = wx.App()
MyFrame1(None).Show()
app.MainLoop()
25. SQLite
: 개인용 DB SQLite3 : Python에 기본 설치됨
: 환경변수 path에 추가 : C:\anaconda3\Library\bin
* test33_sqlite
import sqlite3
print(sqlite3.version_info)
#conn = sqlite3.connect('example.db')
conn = sqlite3.connect(':memory:') # ram에 DB 생성 - 테스트용으로 연습할 때
try:
cur = conn.cursor()
cur.execute("create table if not exists friends(name text, phone text)")
cur.execute("insert into friends values('홍길동', '111-1111')")
cur.execute("insert into friends values(?, ?)", ('tom', '222-2222'))
conn.commit()
cur.execute("select * from friends")
for f in cur:
print(f[0] + " " + f[1])
except Exception as e:
print(e)
conn.rollback()
finally:
conn.close()
26. 원격 DB 연동
: maria db
: anaconda prompt : pip install mysqlclient 입력
* test34_db
import MySQLdb
conn = MySQLdb.connect(host = '127.0.0.1', user = 'root', password='123', database='test')
print(conn)
conn.close
sangdata table 자료 읽기
import MySQLdb
config = {
'host':'127.0.0.1',
'user':'root',
'password':'123',
'database':'test',
'port':3306,
'charset':'utf8',
'use_unicode':True
}
try:
conn = MySQLdb.connect(**config)
cursor = conn.cursor() # sql문 처리용
#자료추가1
sql = "insert into sangdata(code, sang, su, dan) values(10, '새우깡', 5, 1200)"
cursor.execute(sql)
conn.commit()
sql = "insert into sangdata values(%s, %s, %s, %s)"
#sql_data = ('12', '감자깡', 10, 2000)
sql_data = '12', '감자깡', 10, 2000
re = cursor.execute(sql, sql_data) # 성공하면 1, 실패시 0
print(cursor)
print(re)
conn.commit()
# 자료 수정
sql = "update sangdata set sang=%s, su=%s, dan=%s where code=%s"
sql_data = '깡', 7, 1000, 12
cou = cursor.execute(sql, sql_data) # 성공하면 n, 실패시 0
print('성공건수 ',cou)
conn.commit()
# 자료삭제
code = '10'
sql = "delete from sangdata where code =" + code # 비권장
cursor.execute(sql)
sql = "delete from sangdata where code = %s" # 권장 1
cursor.execute(sql, (code,)) # 튜플
sql = "delete from sangdata where code = '{0}'".format(code) # 권장 2
cou = cursor.execute(sql)
print('성공건수 ',cou)
conn.commit()
# 자료읽기 1
sql = "select code, sang, su, dan from sangdata"
cursor.execute(sql) # DB의 자료를 읽어 cursor 객체가 그 결과를 기억
for data in cursor.fetchall():
#print(data)
print('%s %s %s %s'%data)
# 자료읽기 2
for r in cursor:
print(r[0], r[1], r[2], r[3])
# 자료읽기 3
for (code, sang, su, dan) in cursor:
print(code, sang, su, dan)
for (a, b, c, d) in cursor:
print(a, b, c, d)
except MySQLdb.connections.Error as err:
print('db err' + str(err))
except Exception as e:
print('err' + str(e))
finally:
cursor.close()
conn.close()
- 키보드로 부서번호를 입력받아 해당 부서에 근무하는 직원 출력
* test35_db
import MySQLdb
import sys
config = {
'host':'127.0.0.1',
'user':'root',
'password':'123',
'database':'test',
'port':3306,
'charset':'utf8',
'use_unicode':True
}
try:
conn = MySQLdb.connect(**config)
cursor = conn.cursor()
buser_no = input('부서번호 입력 : ')
sql = """
select jikwon_no, jikwon_name, buser_num, jikwon_jik, jikwon_pay
from jikwon
where buser_num ={0}
""".format(buser_no)
cursor.execute(sql)
datas = cursor.fetchall()
if len(datas) == 0:
print(str(buser_no)+ '번 부서는 없습니다.')
sys.exit()
for d in datas:
print(d[0], d[1], d[2], d[3])
print('인원수 : ' + str(len(datas)))
except Exception as e:
print('err', e)
finally:
cursor.close()
conn.close()
* test36_db_gui
config = {
'host':'127.0.0.1',
'user':'root',
'password':'123',
'database':'test',
'port':3306,
'charset':'utf8',
'use_unicode':True
}
- 파일 분리
import ast
with open('mariadb.txt', mode = 'r') as f:
#print(f.read())
aa = f.read()
config = ast.literal_eval(aa)
import wx
import wx.xrc
import MySQLdb
class MyGogek ( wx.Frame ):
def __init__( self, parent ):
wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = u"고객자료 보기", pos = wx.DefaultPosition, size = wx.Size( 500,336 ), style = wx.DEFAULT_FRAME_STYLE|wx.TAB_TRAVERSAL )
#self.SetSizeHintsSz( wx.DefaultSize, wx.DefaultSize )
self.SetBackgroundColour( wx.SystemSettings.GetColour( wx.SYS_COLOUR_MENU ) )
bSizer1 = wx.BoxSizer( wx.VERTICAL )
self.m_panel1 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer2 = wx.BoxSizer( wx.HORIZONTAL )
self.m_staticText1 = wx.StaticText( self.m_panel1, wx.ID_ANY, u"사번 : ", wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_staticText1.Wrap( -1 )
bSizer2.Add( self.m_staticText1, 0, wx.ALL, 5 )
self.txtNo = wx.TextCtrl( self.m_panel1, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer2.Add( self.txtNo, 0, wx.ALL, 5 )
self.m_staticText2 = wx.StaticText( self.m_panel1, wx.ID_ANY, u"직원명", wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_staticText2.Wrap( -1 )
bSizer2.Add( self.m_staticText2, 0, wx.ALL, 5 )
self.txtName = wx.TextCtrl( self.m_panel1, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer2.Add( self.txtName, 0, wx.ALL, 5 )
self.btnLogin = wx.Button( self.m_panel1, wx.ID_ANY, u"로그인", wx.DefaultPosition, wx.DefaultSize, 0 )
bSizer2.Add( self.btnLogin, 0, wx.ALL, 5 )
self.m_panel1.SetSizer( bSizer2 )
self.m_panel1.Layout()
bSizer2.Fit( self.m_panel1 )
bSizer1.Add( self.m_panel1, 0, wx.EXPAND |wx.ALL, 5 )
self.m_panel2 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer3 = wx.BoxSizer( wx.HORIZONTAL )
self.staMsg = wx.StaticText( self.m_panel2, wx.ID_ANY, u"고객목록", wx.DefaultPosition, wx.DefaultSize, 0 )
self.staMsg.Wrap( -1 )
bSizer3.Add( self.staMsg, 0, wx.ALL, 5 )
self.m_panel2.SetSizer( bSizer3 )
self.m_panel2.Layout()
bSizer3.Fit( self.m_panel2 )
bSizer1.Add( self.m_panel2, 0, wx.EXPAND |wx.ALL, 5 )
self.m_panel3 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer4 = wx.BoxSizer( wx.HORIZONTAL )
self.lstGogek = wx.ListCtrl( self.m_panel3, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.LC_REPORT )
bSizer4.Add( self.lstGogek, 1, wx.ALL|wx.EXPAND, 5 )
self.m_panel3.SetSizer( bSizer4 )
self.m_panel3.Layout()
bSizer4.Fit( self.m_panel3 )
bSizer1.Add( self.m_panel3, 1, wx.EXPAND |wx.ALL, 5 )
self.m_panel4 = wx.Panel( self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
bSizer5 = wx.BoxSizer( wx.HORIZONTAL )
self.staCount = wx.StaticText( self.m_panel4, wx.ID_ANY, u"인원수", wx.DefaultPosition, wx.DefaultSize, 0 )
self.staCount.Wrap( -1 )
bSizer5.Add( self.staCount, 0, wx.ALL, 5 )
self.m_panel4.SetSizer( bSizer5 )
self.m_panel4.Layout()
bSizer5.Fit( self.m_panel4 )
bSizer1.Add( self.m_panel4, 1, wx.EXPAND |wx.ALL, 5 )
self.SetSizer( bSizer1 )
self.Layout()
self.Centre( wx.BOTH )
# lstGogek 객체에 제목 작성
self.lstGogek.InsertColumn(0, '고객번호', width=100)
self.lstGogek.InsertColumn(1, '고객명', width=150)
self.lstGogek.InsertColumn(2, '고객전화', width=200)
# Connect Events
self.btnLogin.Bind( wx.EVT_BUTTON, self.OnLogin )
def __del__( self ):
pass
# Virtual event handlers, overide them in your derived class
def OnLogin( self, event ):
if self.txtNo.GetValue() == '': # 사번을 입력안할 경우 팝업 발생
wx.MessageBox('사번입력', '알림', wx.OK)
self.txtNo.SetFocus()
return
if self.txtName.GetValue() == '': # 사번을 입력안할 경우 팝업 발생
wx.MessageBox('직원명 입력', '알림', wx.OK)
self.txtNo.SetFocus()
return
self.LoginCheck()
def LoginCheck(self):
try:
conn = MySQLdb.connect(**config)
cursor = conn.cursor()
no = self.txtNo.GetValue()
name = self.txtName.GetValue()
#print(no, name)
sql ="""
select count(*) from jikwon
where jikwon_no ='{0}' and jikwon_name ='{1}'
""".format(no, name)
cou = cursor.execute(sql)
count = cursor.fetchone()[0]
#print(count) # 1
if count == 0:
wx.MessageBox('로그인 실패', '알림', wx.OK)
return
else:
self.staMsg.SetLabelText(no + '번 직원의 관리 고객목록')
self.DisplayData(no) # 직원자료 출력 메소드 호출
except Exception as e:
print('LoginCheck err',e)
finally:
cursor.close()
conn.close()
def DisplayData(self, no):
try:
conn = MySQLdb.connect(**config)
cursor = conn.cursor()
sql = """
select gogek_no, gogek_name, gogek_tel
from gogek
where gogek_damsano = {}
""".format(no)
cou = cursor.execute(sql)
datas = cursor.fetchall()
#print(datas)
self.lstGogek.DeleteAllItems() # list control 초기화
for d in datas:
i = self.lstGogek.InsertItem(1000, 0) # List Control 최대 행수 입력
self.lstGogek.SetItem(i, 0, str(d[0]))
self.lstGogek.SetItem(i, 1, d[1])
self.lstGogek.SetItem(i, 2, d[2])
self.staCount.SetLabelText('인원수 : ' + str(len(datas)))
except Exception as e:
print('DisplayData err',e)
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
app = wx.App()
MyGogek(None).Show()
app.MainLoop()
- mariadb.txt
{
'host':'127.0.0.1',
'user':'root',
'password':'123',
'database':'test',
'port':3306,
'charset':'utf8',
'use_unicode':True
}
27. socket
- Socket : TCP/IP Protocol(통신규약)의 프로그래머 인터페이스를 모듈로 지원
* socket_test
import socket
print(socket.getservbyname('http','tcp')) # http port 80
print(socket.getservbyname('telnet','tcp')) # http port 23
print(socket.getservbyname('ftp','tcp')) # http port 21
print(socket.getservbyname('smtp','tcp')) # http port 25
print(socket.getservbyname('pop3','tcp')) # http port 110
print()
print(socket.getaddrinfo('www.naver.com', 80, proto=socket.SOL_TCP))
# [(<AddressFamily.AF_INET: 2>, 0, 6, '', ('125.209.222.142', 80)), (<AddressFamily.AF_INET: 2>, 0, 6, '', ('223.130.195.200', 80))]
print(socket.getaddrinfo('www.daum.net', 80, proto=socket.SOL_TCP))
* soc1_server
# 접속상태 확인을 위한 단순 Echo Server - client의 요청에 1회만 반응
from socket import *
serverSock = socket(AF_INET, SOCK_STREAM) # socket(socket 종류, socket 유형)
serverSock.bind(('127.0.0.1', 9999))
serverSock.listen(1) # TCP 리스너 설정
print('server start')
conn, addr = serverSock.accept()
print('client addr :', addr)
print('from client message : ', conn.recv(1024).decode())
# 클라이언트로 부터 바이트 단위 문자열로 수신된 내용 출력
conn.close()
serverSock.close()
* soc1_client
# 1회 접속용 클라이언트
from socket import *
clientSock = socket(AF_INET, SOCK_STREAM)
clientSock.connect(('127.0.0.1', 9999))
clientSock.sendall('안녕 반가워'.encode(encoding='utf_8', errors='strict'))
clientSock.close()
* soc2_server
# 서버 서비스 계속 유지
import socket
import sys
HOST = ''
PORT = 8888
serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
serverSock.bind((HOST, PORT))
print('server start')
serverSock.listen(5) # 클라이언트 접속 대기. 최대 동시 접속수는 1 ~ 5
while True:
conn,addr = serverSock.accept() # 클라이언트 접속 승인
print('client info : ', addr[0], addr[1])
print(conn.recv(1024).decode()) # 클라이언트가 전송한 메세지 수신 후 출력
# 전송
conn.send('from server : ' + str(addr[1]) + 'ㅁㅁㅁ'.encode('utf_8'))
except socket.error as err:
print('socket error : ', err)
except Exception as e:
print('error : ', e)
finally:
serverSock.close()
* soc2_client
# 1회 접속용 클라이언트
from socket import *
clientSock = socket(AF_INET, SOCK_STREAM)
clientSock.connect(('127.0.0.1', 8888))
clientSock.sendall('안녕 반가워'.encode(encoding='utf_8'))
re_msg = clientSock.recv(1024).decode()
print('수신자료', re_msg)
clientSock.close()
28. Thread
- process : 실행 중인 응용프로그램을 의미, 프로세스 단위로 별도의 메모리를 사용
from subprocess import *
Popen('calc.exe')
Popen('notepad.exe')
- thread : Light weight process 라고도 함. 메인 프로세스(스레드)와 병렬적으로 수행되는 단위 프로그램. 스레드 단위로 함수나 메소드를 수행 가능.
* th1
import time
def run(id):
for i in range(1, 11):
print('id:{} -> {}'.format(id, i))
time.sleep(0.5)
1. thread를 사용하지 않은 경우
run(1) # 순차적으로 호출되므로 순차적으로 처리됨
run(2)
2. thread를 사용한 경우 : 스레드 스케쥴러에 의해 랜덤하게 스레드 처리가 됨
import threading
th1 = threading.Thread(target=run, args = ('일')) # 사용자 정의 스레드 생성
th2 = threading.Thread(target=run, args = ('이'))
th1.start() # 스레드 수행 시작
th2.start()
th1.join() # 사용자 정의 스레드가 종료될때까지 메인 스레드 대기 시킴
th2.join()
print('메인 프로그램 종료') # 메인 스레드에 의해 메인 모듈이 실행(기본 값)
* th2
스레드를 이용하여 날짜 및 시간 출력
import time
import threading
now = time.localtime()
#print(now)
print('현재는 {0}년 {1}월 {2} 일 {3} 시 {4} 분 {5} 초'.format(now.tm_year, now.tm_mon, \
now.tm_mday ,now.tm_hour,now.tm_min,now.tm_sec))
def cal_show():
now = time.localtime()
print('현재는 {0}년 {1}월 {2} 일 {3} 시 {4} 분 {5} 초'.format(now.tm_year, now.tm_mon, \
now.tm_mday ,now.tm_hour,now.tm_min,now.tm_sec))
def myRun():
while True:
now2 = time.localtime()
if now2.tm_min == 57:
break
cal_show()
time.sleep(1)
th = threading.Thread(target=myRun)
th.start()
th.join()
class MyThread(threading.Thread):
def run(self):
for i in range(1, 10):
print('id{} --> {}'.format(self.getName(), i))
time.sleep(0.1)
ths = []
for i in range(2):
th = MyThread()
th.start()
ths.append(th)
for th in ths:
th.join()
print('프로그램 종료')
- 스레드간 공유자원 값 충돌 방지 - 동기화
* th3
import threading, time
g_count = 0 # 전역변수는 자동으로 스레드의 공유자원이 됨
lock = threading.Lock()
def threadCount(id, count):
global g_count
for i in range(count):
lock.acquire() # 두 개 이상의 스레드 공유 자원 충돌방지. lock을 걸어 다른 스레드 수행대기.
print('id %s => count : %s, g_count : %s'%(id, i, g_count))
g_count += 1
lock.release() # lock을 해제
for i in range(1, 6):
threading.Thread(target=threadCount, args =(i, 5)).start()
time.sleep(2)
print('final g_count : ', g_count)
print('finish process')
- 스레드의 활성화/비활성화
* th4
import threading, time
bread_plate = 0 # 빵 접시 - 공유자원
lock = threading.Condition()
class Maker(threading.Thread): # 빵 생성자
def run(self):
global bread_plate
for i in range(30):
lock.acquire() # 공유자운 충동방지
while bread_plate >=10:
print('빵 생산 초과로 대기')
lock.wait() # 쓰레드 비활성화
bread_plate += 1
print('빵 생산 후 접시에 담기 : ', bread_plate)
lock.notify() # 쓰레드 활성화
lock.release()
time.sleep(0.05)
class Eater(threading.Thread): # 빵 소비자
def run(self):
global bread_plate
for i in range(30):
lock.acquire() # 공유자원 충동방지
while bread_plate <1:
print('빵이 없어 대기')
lock.wait() # 쓰레드 비활성화
bread_plate -= 1
print('빵 소비 후 접시의 빵 수 : ', bread_plate)
lock.notify() # 쓰레드 활성화
lock.release()
time.sleep(0.07)
mak = []
con = []
for i in range(5): # 빵 생산자 수
mak.append(Maker())
for i in range(5): # 빵 소비자 수
con.append(Eater())
for th1 in mak:
th1.start()
for th2 in con:
th2.start()
for th1 in mak:
th1.join()
for th2 in con:
th2.join()
print('process 끝')
29. 멀티 쓰레드 채팅 프로그램
: socket, thread 사용
* chatServer (서버)
import socket
import threading
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 서버 소켓 생성
ss.bind(('127.0.0.1', 5000)) # ip에 연결
ss.listen(5) # 동접속자 수 최대 5로 리스너 설정
print('Chatting start')
users = []
def ChatUser(conn):
name = conn.recv(1024)
data = '[알림]' + name.decode('utf_8') + '님이 접속하셨습니다.'
print(data)
try:
for p in users: # 다른 클라이언트에도 접속 메시지 send
p.send(data.encode('utf_8'))
while True: # 채팅 진행
msg = conn.recv(1024) # 수신한 메세지
data = '('+name.decode('utf_8') + ') : ' + msg.decode() # 송부자 + 메세지
print('[R] '+data)
for p in users: # 다른 클라이언트에도 메시지 send
p.send(data.encode('utf_8'))
except Exception as e: # 접속 종료 시
users.remove(conn) # 접속 종료 시 list에 conn 제거
data = '[알림]' + name.decode('utf_8') + '님이 접속을 종료하셨습니다.'
print(data)
print(e)
if users: # 다른 클라이언트에도 접속 종료 메시지 send
for p in users:
p.send(data.encode('utf_8'))
else:
print('exit')
while True:
conn, addr = ss.accept() # 소켓 접속 및 대기. 클라이언트 접속 시 값 리턴
users.append(conn) # 접속 시 list에 conn 추가
th = threading.Thread(target=ChatUser, args = (conn, )) # 접속자 수 만큼 쓰레드 생성
th.start()
* chatClient (클라이언트)
import socket
import threading
import sys
def Handle(socket):
while True:
data = socket.recv(1024)
if not data: continue
print(data.decode('utf_8')) # 파이썬 표준 출력은 버퍼링 된다.
#print(data.decode('utf_8', flush=true))
sys.stdout.flush() # 표준 입출력 버퍼 비움
cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 클라이언트 소켓 생성
cs.connect(('127.0.0.1', 5000)) # 해당 ip에 접속
name = input('채팅 아이디 입력 : ')
cs.send(name.encode('utf_8')) # 접속 id send
th = threading.Thread(target=Handle, args = (cs,)) # 쓰레드 생성
th.start()
while True:
msg = input() # 채팅 메세지 입력
sys.stdout.flush()
if not msg:continue
cs.send(msg.encode('utf_8')) # msg가 있을 경우 msg send
cs.close()
30. pool
GIL(Global Interpreter Lock) 파이썬 인터프리터가 한 스레드만 하나의 바이트코드를 실행 시킬 수 있도록 해주는 Lock
하나의 스레드에 모든 자원을 허락하고 그 후에는 Lock을 걸어 다른 스레드는 실행할 수 없게 막아버린다.
그런 이유로 구조적으로 충돌이 발생하는 경우가 발생한다.
이에 개선사항으로 멀티 프로세싱 모듈 지원한다.
Pool : 압력 값에 대해 process들을 건너건너 분배하여 함수 실행을 병렬하는 방법
PID : process id
* 07_multi_pool
from multiprocessing import Pool
import time
import os
def f(x):
print('값', x, '에 대한 작업 pid', os.getpid()) # 현재 진행 중인 process의 processId를 반환
time.sleep(1)
return x * x
if __name__ == '__main__':
p = Pool(3) # pool 객체 생성. 프로세스 수 3 ~ 5 권장
startTime = int(time.time())
for i in range(0, 10): # 0 ~ 9
print(f(i)) # 10
# 값 0 에 대한 작업 pid 75580
# 0
# 값 1 에 대한 작업 pid 75580
# 1
# 값 2 에 대한 작업 pid 75580
# 4
# 값 3 에 대한 작업 pid 75580
# 9
# 값 4 에 대한 작업 pid 75580
# 16
# 값 5 에 대한 작업 pid 75580
# 25
# 값 6 에 대한 작업 pid 75580
# 36
# 값 7 에 대한 작업 pid 75580
# 49
# 값 8 에 대한 작업 pid 75580
# 64
# 값 9 에 대한 작업 pid 75580
# 81
# 총 작업 시간 : 10
print(p.map(f, range(0, 10))) # 함수와 인자값을 매핑하면서 데이터를 분배처리
# 값 0 에 대한 작업 pid 75712
# 값 1 에 대한 작업 pid 75604
# 값 2 에 대한 작업 pid 75540
# 값 3 에 대한 작업 pid 75712
# 값 4 에 대한 작업 pid 75604
# 값 5 에 대한 작업 pid 75540
# 값 6 에 대한 작업 pid 75712
# 값 7 에 대한 작업 pid 75604
# 값 8 에 대한 작업 pid 75540
# 값 9 에 대한 작업 pid 75712
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 총 작업 시간 : 5
endTime = int(time.time())
print('총 작업 시간 : ', (endTime - startTime))
31. process
멀티 프로세실을 위한 Process 클래스
하나의 프로세스를 하나의 함수에 적당한 인자값을 할당해주고 (없을 수도 있음) 진행
* 08_multi_process
import time
import os
from multiprocessing import Process
def func():
print('연속 진행하고자 하는 어떤 작업')
#time.sleep(1)
def doubler(number):
result = number + 10
func()
proc = os.getpid()
print('number : {0}, number : {1}, process id : {2}'.format(number, result, proc))
if __name__ == '__main__':
numbers = [1, 2, 3, 4, 5]
procs = []
for index, number in enumerate(numbers):
proc = Process(target = doubler, args = (number, ))
procs.append(proc) # Process에 join() 추가할 의도
proc.start() # doubler 함수가 호출
for proc in procs:
proc.join()
# 연속 진행하고자 하는 어떤 작업
# number : 1, number : 11, process id : 11880
# 연속 진행하고자 하는 어떤 작업
# number : 2, number : 12, process id : 59068
# 연속 진행하고자 하는 어떤 작업
# number : 3, number : 13, process id : 55000
# 연속 진행하고자 하는 어떤 작업
# number : 4, number : 14, process id : 74932
# 연속 진행하고자 하는 어떤 작업
# number : 5, number : 15, process id : 55096
32. 웹 크롤링
멀티 프로세싱을 통한 웹 크롤링 연습 1 - 멀티 프로세싱 없이 작업
* 09_multi_web1
import requests
from bs4 import BeautifulSoup as bs
# html, xml 지원, json 지원x
import time
def get_links(): # 해당 컨텐츠의 a tag 얻기
data = requests.get("https://beomi.github.io/beomi.github.io_old/").text
soup = bs(data, 'html.parser')
my_titles = soup.select('h3 > a')
data = []
for title in my_titles:
data.append(title.get('href')) # a tag의 속성 중 href 값 반환
return data
def get_content(link):
abs_link = "https://beomi.github.io" + link
req = requests.get(abs_link)
html = req.text
soup = bs(html, 'html.parser')
print(soup.select('h1')[0].text) # 첫번째 h1 tag의 문자열 출력
if __name__ == '__main__':
#print(get_links())
#print(len(get_links())) # 26
start_time = time.time()
for link in get_links():
get_content(link)
end_time = time.time()
print('소요시간 : %s 초'%(end_time- start_time))
# 나만의 웹 크롤러 만들기(4): Django로 크롤링한 데이터 저장하기
# 나만의 웹 크롤러 만들기(3): Selenium으로 무적 크롤러 만들기
# Django에 Social Login 붙이기: Django세팅부터 Facebook/Google 개발 설정까지
# Django에 Custom인증 붙이기
# 나만의 웹 크롤러 만들기(2): Login with Session
# 나만의 웹 크롤러 만들기 with Requests/BeautifulSoup
# Celery로 TelegramBot 알림 보내기
# Virtualenv/VirtualenvWrapper OS별 설치&이용법
# [DjangoTDDStudy] #02: UnitTest 이용해 기능 테스트 하기
# [DjangoTDDStudy] #01: 개발환경 세팅하기(Selenium / ChromeDriver)
# [DjangoTDDStudy] #00: 스터디를 시작하며
# Fabric Put 커맨드가 No Such File Exception을 반환할 때 해결법
# CKEditor의 라이센스와 오픈소스 라이센스
# ReactNative The Basis 번역을 끝냈습니다.
# [React Native 번역]#01: 시작하기
# [번역] 장고(Django)와 함께하는 Celery 첫걸음
# Chrome Native Adblockr 대체하기
# CustoMac 설치 분투기
# Ubuntu14.04에 OhMyZsh 설치
# Ubuntu14.04에서 pip로 mysqlclient 설치 실패시
# Ubuntu14.04에서 Python3기반 virtualenvwrapper 설치
# mac OS X에서 pip virtualenvwrapper 설치 시 uninstalling six 에서 Exception 발생 시
# Fabric for Python3 (Fabric3)
# Windows에서 pip로 mysqlclient 설치 실패시(python3.4/3.5)
# 맥에서 윈도RDP로 접속시 한영전환하기.
# pip로 mysqlclient설치 중 mac os x에서 egg_info / OSError 발생시 대처방법
# 소요시간 : 7.727251768112183 초
멀티 프로세싱을 통한 웹 크롤링 연습 2 - 멀티 프로세싱 사용
* 10_multi_web2
import requests
from bs4 import BeautifulSoup as bs
import time
from multiprocessing import Pool
def get_links(): # 해당 컨텐츠의 a tag 얻기
data = requests.get("https://beomi.github.io/beomi.github.io_old/").text
soup = bs(data, 'html.parser')
my_titles = soup.select('h3 > a')
data = []
for title in my_titles:
data.append(title.get('href')) # a tag의 속성 중 href 값 반환
return data
def get_content(link):
abs_link = "https://beomi.github.io" + link
req = requests.get(abs_link)
html = req.text
soup = bs(html, 'html.parser')
print(soup.select('h1')[0].text) # 첫번째 h1 tag의 문자열 출력
if __name__ == '__main__':
start_time = time.time()
# for link in get_links():
# get_content(link)
pool = Pool(processes = 4)
pool.map(get_content, get_links())
end_time = time.time()
print('소요시간 : %s 초'%(end_time- start_time))
# Django에 Social Login 붙이기: Django세팅부터 Facebook/Google 개발 설정까지
# 나만의 웹 크롤러 만들기(4): Django로 크롤링한 데이터 저장하기
# Celery로 TelegramBot 알림 보내기
# 나만의 웹 크롤러 만들기(2): Login with Session
# 나만의 웹 크롤러 만들기(3): Selenium으로 무적 크롤러 만들기
# Django에 Custom인증 붙이기
# 나만의 웹 크롤러 만들기 with Requests/BeautifulSoup
# Virtualenv/VirtualenvWrapper OS별 설치&이용법
# [DjangoTDDStudy] #00: 스터디를 시작하며
# [DjangoTDDStudy] #02: UnitTest 이용해 기능 테스트 하기
# CKEditor의 라이센스와 오픈소스 라이센스
# [React Native 번역]#01: 시작하기
# Fabric Put 커맨드가 No Such File Exception을 반환할 때 해결법
# [DjangoTDDStudy] #01: 개발환경 세팅하기(Selenium / ChromeDriver)
# ReactNative The Basis 번역을 끝냈습니다.
# [번역] 장고(Django)와 함께하는 Celery 첫걸음
# Ubuntu14.04에 OhMyZsh 설치
# Chrome Native Adblockr 대체하기
# Ubuntu14.04에서 Python3기반 virtualenvwrapper 설치
# Fabric for Python3 (Fabric3)
# Ubuntu14.04에서 pip로 mysqlclient 설치 실패시
# CustoMac 설치 분투기
# mac OS X에서 pip virtualenvwrapper 설치 시 uninstalling six 에서 Exception 발생 시
# Windows에서 pip로 mysqlclient 설치 실패시(python3.4/3.5)
# 맥에서 윈도RDP로 접속시 한영전환하기.
# pip로 mysqlclient설치 중 mac os x에서 egg_info / OSError 발생시 대처방법
# 소요시간 : 3.3195197582244873 초
33. HttpServer
HttpServer 구축용 클래스를 이용해 웹서버 서비스 하기
HTTPServer : 기본적인 socket 연결을 관리하는 클래스
SimpleHTTPRequestHandler : get, head 요청만 처리가능
* 01_httpServer1
from http.server import SimpleHTTPRequestHandler, HTTPServer
PORT = 7777
handler = SimpleHTTPRequestHandler
serv = HTTPServer(('127.0.0.1', PORT), handler) # HTTPServer 서버 객체 생성
print('HTTPServer 서비스 시작')
serv.serve_forever() # HTTPServer 서비스 시작
* index.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
<h2>파이썬</h2>
<a href="https://daum.net">DAUM 접속</a>
<br>
<a href="bbb.html">bbb 문서</a>
<hr>
바닥글. 작성자 : <b style="font-family : 궁서">홍길동</b>
</body>
</html>
* bbb.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
단순웹 운영중<br>
<input type = "button" value = "이전으로" onclick="history.back()">
</body>
</html>
34. CGI
CGI(Common Gateway Interface)란 웹서버(정보제공측)와 클라이언트(정보이용측)간에 필요한 정보교환을 가능하게 해주는 일종의 웹인터페이스라고(일종의 프로그램) 할 수 있습니다.
PHP, ASP, ASP.NET, JSP...
CGI를 지원하는 CGIHTTPRequestHandler를 사용하면 클라이언트와 서버 사이에 자료를 주고 받을 수 있다. py 문서를 처리가능
get, post 요청처리 모두 지원
* 01_httpServer2
from http.server import CGIHTTPRequestHandler, HTTPServer
PORT = 8888
class HandlerClass(CGIHTTPRequestHandler):
cgi_directories = ['/cgi-bin']
serv = HTTPServer(('127.0.0.1', PORT), HandlerClass) # HTTPServer 서버 객체 생성
print('HTTPServer 서비스 시작')
serv.serve_forever() # HTTPServer 서비스 시작
* main.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
<h1>메인 페이지</h1>
<ul>
<li><a href="https://www.naver.com">네이버</a></li>
<li><a href="cgi-bin/hello.py">hello.py</a></li>
<li><a href="cgi-bin/world.py">world.py</a></li>
<li><a href="cgi-bin/my.py?name=tom&age=23">my.py - get방식으로 자료 전달</a></li>
<li><a href="friend.html">friend</a></li>
<li><a href="cgi-bin/sangpum.py">상품자료 확인</a></li>
</ul>
</body>
</html>
* hello
# 웹 서비스가 가능한 파이썬 파일 작성
ss = '파이썬 변수'
kbs = 9
mbc = 10 + 1.0
print('Content-Type:text/html;charset=utf-8\n')
print('<html>')
print('<body>')
print('<h2>파이썬 문서로 정보 전달</h2>')
print('<b><i>hi</i></b>')
print('<br>파이썬 변수 출력 : %s, %d, %f'%(ss, kbs, mbc))
print('</body>')
print('</html>')
* world
s1 = "자료1"
s2 = "자료2"
print('Content-Type:text/html;charset=utf-8\n')
print("""
<html>
<body>
<h2>html문서 좀더 쉽게 작성</h2>
자료 출력 : {0}, {1}
<br>
<img src='../images/pic.png' width='60%' />
<br>
<a href='../main.html'>메인</a>
</body>
</html>
""".format(s1, s2))
* my
#client가 server로 자료 전달
import cgi
# 사용자(client)가 입력한 자료 받기 - get
form = cgi.FieldStorage()
name = form['name'].value
age = form['age'].value
print('Content-Type:text/html;charset=utf-8\n')
print("""
<html>
<body>
사용자가 입력한 자료 <br>
이름 : {0},
나이 : {1}
</body>
</html>
""".format(name, age))
* friend.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
친구 자료 입력<p/>
<form action="cgi-bin/friend.py" method="post">
이름 : <input type="text" name ="name" value="홍길동"/><br>
전화 : <input type="text" name ="phone"/><br>
성별 :
<input type="radio" name="gen" value="남" checked="checked"/>남자
<input type="radio" name="gen" value="여"/>여자<br>
<br>
<input type="submit" value="전송">
</form>
</body>
</html>
* friend.py
import cgi
# 사용자(client)가 입력한 자료 받기 - post
form = cgi.FieldStorage()
name = form['name'].value
phone = form['phone'].value
gen = form['gen'].value
# 값으로 DB저장, 연산에 참여 작업 진행필요.
# 단순 출력 진행.
print('Content-Type:text/html;charset=utf-8\n')
print("""
<html>
<body>
이름 : {}<br>
전화번호 : {}<br>
성별 : {}<br>
</body>
</html>
""".format(name, phone, gen))
* sangpum
# Maria DB의 sangpum table 자료를 웹으로 출력
import MySQLdb
import ast
with open('cgi-bin/mariadb.txt', mode='r') as f:
config = ast.literal_eval(f.read())
print('Content-Type:text/html;charset=utf-8\n')
print('<html><body><h2>상품자료 (Python 서버이용)</h2>')
print('<table border="1">')
print('<tr><th>코드</th><th>품명</th><th>수량</th><th>단가</th></tr>')
try:
conn = MySQLdb.connect(**config)
cursor = conn.cursor()
cursor.execute("select * from sangdata")
datas = cursor.fetchall()
for code, sang, su, dan in datas:
print("""<tr>
<td>{0}</td>
<td>{1}</td>
<td>{2}</td>
<td>{3}</td>
</tr>""".format(code, sang, su, dan))
except Exception as e:
print('error',e)
finally:
cursor.close()
conn.close()
print('</table></html></body>')
35. 챗봇(chatbot)
① 사이트 접속 후 파일 다운로드
www.lfd.uci.edu/~gohlke/pythonlibs/#jpype
Python Extension Packages for Windows - Christoph Gohlke
by Christoph Gohlke, Laboratory for Fluorescence Dynamics, University of California, Irvine. Updated on 14 February 2021 at 04:31 UTC. This page provides 32- and 64-bit Windows binaries of many scientific open-source extension packages for the official CPy
www.lfd.uci.edu
[JPype1-1.2.0-cp38-cp38-win_amd64.whl] Download
② anaconda prompt 접속 후 명령어 입력
cd C:\Users\wonh\Downloads
pip install JPype1-1.2.0-cp38-cp38-win_amd64.whl
pip install --upgrade pip
pip install konlpy
* main.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
<h1>메인 페이지</h1>
<ul>
<li>인사관리</li>
<li>영업관리</li>
<li>게시판</li>
<li><a href="cgi-bin/chatbot.py">챗봇</a></li>
</ul>
<pre>
본문
</pre>
<hr>
<footer>바닥글</footer>
</body>
</html>
* httpServer
# 챗봇용 서버
from http.server import CGIHTTPRequestHandler, HTTPServer
PORT = 8080
class HandlerClass(CGIHTTPRequestHandler):
cgi_directories = ['/cgi-bin']
serv = HTTPServer(('127.0.0.1', PORT), HandlerClass) # HTTPServer 서버 객체 생성
print('챗봇용 HTTPServer 서비스 시작')
serv.serve_forever() # HTTPServer 서비스 시작
* chatbot
import cgi
from botengine import make_reply
# 입력 양식의 글자 추출하기 ---
form = cgi.FieldStorage()
# 메인 처리 ---
def go_main():
m = form.getvalue("m", default="")
if m == "" :
show_form()
elif m == "say" :
api_say()
# 사용자의 입력에 응답하기 ---
def api_say():
print("Content-Type: text/plain; charset=utf-8")
print("")
txt = form.getvalue("txt", default="")
if txt == "":
return
res = make_reply(txt)
print(res)
# 입력 양식 출력하기 ---
def show_form():
print("Content-Type: text/html; charset=utf-8")
print("")
print("""
<html><meta charset="utf-8"><body>
<script src="https://code.jquery.com/jquery-3.1.1.min.js"></script>
<style>
h1{ background-color: #ffe0e0; }
div{ padding:10px; }
span{ border-radius: 10px; background-color: #ffe0e0; padding:8px; }
.bot{ text-align: left; }
.usr{ text-align: right; }
</style>
<h1>* 대화하기 *</h1>
<div id="chat"></div>
<div class='usr'>
<input id="txt" size="40">
<button onclick="say()">전송</button>
</div>
<script>
var url = "./chatbot.py";
function say() {
var txt = $('#txt').val();
$.get(url, {"m":"say","txt":txt},
function(res) {
var html = "<div class='usr'><span>" + esc(txt) +
"</span>: 나</div><div class='bot'> 봇:<span>" + esc(res) + "</span></div>";
$('#chat').html($('#chat').html()+html);
$('#txt').val('').focus();
});
}
function esc(s) {
return s.replace('&', '&').replace('<','<').replace('>', '>');
}
</script></body></html>
""")
go_main()
* botengine
import codecs
from bs4 import BeautifulSoup
import urllib.request
from konlpy.tag import Okt
import os, re, json, random
dict_file = "chatbot-data.json"
dic = {}
twitter = Okt()
# 딕셔너리에 단어 등록하기 ---
def register_dic(words):
global dic
if len(words) == 0: return
tmp = ["@"]
for i in words:
word = i[0]
if word == "" or word == "\r\n" or word == "\n": continue
tmp.append(word)
if len(tmp) < 3: continue
if len(tmp) > 3: tmp = tmp[1:]
set_word3(dic, tmp)
if word == "." or word == "?":
tmp = ["@"]
continue
# 딕셔너리가 변경될 때마다 저장하기
json.dump(dic, open(dict_file,"w", encoding="utf-8"))
# 딕셔너리에 글 등록하기
def set_word3(dic, s3):
w1, w2, w3 = s3
if not w1 in dic: dic[w1] = {}
if not w2 in dic[w1]: dic[w1][w2] = {}
if not w3 in dic[w1][w2]: dic[w1][w2][w3] = 0
dic[w1][w2][w3] += 1
# 문장 만들기 ---
def make_sentence(head):
if not head in dic: return ""
ret = []
if head != "@":
ret.append(head)
top = dic[head]
w1 = word_choice(top)
w2 = word_choice(top[w1])
ret.append(w1)
ret.append(w2)
while True:
if w1 in dic and w2 in dic[w1]:
w3 = word_choice(dic[w1][w2])
else:
w3 = ""
ret.append(w3)
if w3 == "." or w3 == "? " or w3 == "":
break
w1, w2 = w2, w3
ret = "".join(ret)
# 띄어쓰기
params = urllib.parse.urlencode({
"_callback": "",
"q": ret
})
# 네이버의 맞춤법 검사기 api를 사용
data = urllib.request.urlopen("https://m.search.naver.com/p/csearch/ocontent/spellchecker.nhn?" + params)
data = data.read().decode("utf-8")[1:-2]
data = json.loads(data)
data = data["message"]["result"]["html"]
data = soup = BeautifulSoup(data, "html.parser").getText()
return data
def word_choice(sel):
keys = sel.keys()
return random.choice(list(keys))
# 챗봇 응답 만들기 ---
def make_reply(text):
# 단어 학습 시키기
if not text[-1] in [".", "?"]: text += "."
words = twitter.pos(text)
register_dic(words)
# 사전에 단어가 있다면 그것을 기반으로 문장 만들기
for word in words:
face = word[0]
if face in dic:
return make_sentence(face)
return make_sentence("@")
# 딕셔너리가 있다면 읽어 들이기
if os.path.exists(dict_file):
dic = json.load(open(dict_file, "r"))
'Programming 언어 > Python' 카테고리의 다른 글
[Python] 파이썬 정리 링크 (0) | 2021.02.17 |
---|---|
[Python] 파이썬 정리 4 - 함수, 변수의 생존 범위, 클로저, 일급함수, 모듈, 사용자 정의 모듈, turtle, 외부 모듈 사용, 클래스, 포함, 상속, 오버라이딩 (0) | 2021.02.09 |
[Python] 파이썬 정리 3 - 변수, 연산자, 집합형 자료, 정규 표현식, 조건 판단문 if문, 반복문 while, 반복문 for (0) | 2021.02.08 |
[Python] 파이썬 정리 2 - 기초 (0) | 2021.02.08 |
[Python] 파이썬 정리 1 - 환경구축(이클립스) (0) | 2021.02.08 |